fix: fix cron timeout clean
This commit is contained in:
parent
c8501b1768
commit
c858cc7d44
@ -10,8 +10,11 @@ use async_trait::async_trait;
|
|||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use croner::Cron;
|
use croner::Cron;
|
||||||
use sea_orm::{
|
use sea_orm::{
|
||||||
ActiveValue::Set, DeriveActiveEnum, DeriveDisplay, DeriveEntityModel, EnumIter, QuerySelect,
|
ActiveValue::Set,
|
||||||
Statement, TransactionTrait, entity::prelude::*, sea_query::LockType,
|
Condition, DeriveActiveEnum, DeriveDisplay, DeriveEntityModel, EnumIter, QuerySelect,
|
||||||
|
Statement, TransactionTrait,
|
||||||
|
entity::prelude::*,
|
||||||
|
sea_query::{ExprTrait, LockBehavior, LockType},
|
||||||
sqlx::postgres::PgNotification,
|
sqlx::postgres::PgNotification,
|
||||||
};
|
};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@ -126,6 +129,7 @@ impl Model {
|
|||||||
ctx: &dyn AppContextTrait,
|
ctx: &dyn AppContextTrait,
|
||||||
notification: PgNotification,
|
notification: PgNotification,
|
||||||
worker_id: &str,
|
worker_id: &str,
|
||||||
|
retry_duration: chrono::Duration,
|
||||||
) -> RecorderResult<()> {
|
) -> RecorderResult<()> {
|
||||||
let payload: Self = serde_json::from_str(notification.payload())?;
|
let payload: Self = serde_json::from_str(notification.payload())?;
|
||||||
let cron_id = payload.id;
|
let cron_id = payload.id;
|
||||||
@ -140,7 +144,8 @@ impl Model {
|
|||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("Error executing cron {cron_id}: {e}");
|
tracing::error!("Error executing cron {cron_id}: {e}");
|
||||||
cron.mark_cron_failed(ctx, &e.to_string()).await?;
|
cron.mark_cron_failed(ctx, &e.to_string(), retry_duration)
|
||||||
|
.await?;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
None => {
|
None => {
|
||||||
@ -162,7 +167,7 @@ impl Model {
|
|||||||
let txn = db.begin().await?;
|
let txn = db.begin().await?;
|
||||||
|
|
||||||
let cron = Entity::find_by_id(cron_id)
|
let cron = Entity::find_by_id(cron_id)
|
||||||
.lock(LockType::Update)
|
.lock_with_behavior(LockType::Update, LockBehavior::SkipLocked)
|
||||||
.one(&txn)
|
.one(&txn)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
@ -235,7 +240,12 @@ impl Model {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn mark_cron_failed(&self, ctx: &dyn AppContextTrait, error: &str) -> RecorderResult<()> {
|
async fn mark_cron_failed(
|
||||||
|
&self,
|
||||||
|
ctx: &dyn AppContextTrait,
|
||||||
|
error: &str,
|
||||||
|
retry_duration: chrono::Duration,
|
||||||
|
) -> RecorderResult<()> {
|
||||||
let db = ctx.db();
|
let db = ctx.db();
|
||||||
|
|
||||||
let should_retry = self.attempts < self.max_attempts;
|
let should_retry = self.attempts < self.max_attempts;
|
||||||
@ -247,7 +257,7 @@ impl Model {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let next_run = if should_retry {
|
let next_run = if should_retry {
|
||||||
Some(Utc::now() + chrono::Duration::seconds(5))
|
Some(Utc::now() + retry_duration)
|
||||||
} else {
|
} else {
|
||||||
Some(self.calculate_next_run(&self.cron_expr)?)
|
Some(self.calculate_next_run(&self.cron_expr)?)
|
||||||
};
|
};
|
||||||
@ -281,6 +291,55 @@ impl Model {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn check_and_cleanup_expired_cron_locks(
|
||||||
|
ctx: &dyn AppContextTrait,
|
||||||
|
retry_duration: chrono::Duration,
|
||||||
|
) -> RecorderResult<()> {
|
||||||
|
let db = ctx.db();
|
||||||
|
|
||||||
|
let condition = Condition::all()
|
||||||
|
.add(Column::Status.eq(CronStatus::Running))
|
||||||
|
.add(Column::LastRun.is_not_null())
|
||||||
|
.add(Column::TimeoutMs.is_not_null())
|
||||||
|
.add(
|
||||||
|
Expr::col(Column::LastRun)
|
||||||
|
.add(Expr::col(Column::TimeoutMs).mul(Expr::cust("INTERVAL '1 millisecond'")))
|
||||||
|
.lte(Expr::current_timestamp()),
|
||||||
|
);
|
||||||
|
|
||||||
|
let cron_ids = Entity::find()
|
||||||
|
.select_only()
|
||||||
|
.column(Column::Id)
|
||||||
|
.filter(condition.clone())
|
||||||
|
.lock_with_behavior(LockType::Update, LockBehavior::SkipLocked)
|
||||||
|
.into_tuple::<i32>()
|
||||||
|
.all(db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
for cron_id in cron_ids {
|
||||||
|
let txn = db.begin().await?;
|
||||||
|
let locked_cron = Entity::find_by_id(cron_id)
|
||||||
|
.filter(condition.clone())
|
||||||
|
.lock_with_behavior(LockType::Update, LockBehavior::SkipLocked)
|
||||||
|
.one(&txn)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if let Some(locked_cron) = locked_cron {
|
||||||
|
locked_cron
|
||||||
|
.mark_cron_failed(
|
||||||
|
ctx,
|
||||||
|
format!("Cron timeout of {}ms", locked_cron.timeout_ms).as_str(),
|
||||||
|
retry_duration,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
txn.commit().await?;
|
||||||
|
} else {
|
||||||
|
txn.rollback().await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn calculate_next_run(&self, cron_expr: &str) -> RecorderResult<DateTime<Utc>> {
|
fn calculate_next_run(&self, cron_expr: &str) -> RecorderResult<DateTime<Utc>> {
|
||||||
let cron_expr = Cron::new(cron_expr).parse()?;
|
let cron_expr = Cron::new(cron_expr).parse()?;
|
||||||
|
|
||||||
|
@ -12,6 +12,8 @@ pub struct TaskConfig {
|
|||||||
pub subscriber_task_timeout: Duration,
|
pub subscriber_task_timeout: Duration,
|
||||||
#[serde(default = "default_system_task_timeout")]
|
#[serde(default = "default_system_task_timeout")]
|
||||||
pub system_task_timeout: Duration,
|
pub system_task_timeout: Duration,
|
||||||
|
#[serde(default = "default_cron_retry_duration")]
|
||||||
|
pub cron_retry_duration: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for TaskConfig {
|
impl Default for TaskConfig {
|
||||||
@ -21,6 +23,7 @@ impl Default for TaskConfig {
|
|||||||
system_task_concurrency: default_system_task_workers(),
|
system_task_concurrency: default_system_task_workers(),
|
||||||
subscriber_task_timeout: default_subscriber_task_timeout(),
|
subscriber_task_timeout: default_subscriber_task_timeout(),
|
||||||
system_task_timeout: default_system_task_timeout(),
|
system_task_timeout: default_system_task_timeout(),
|
||||||
|
cron_retry_duration: default_cron_retry_duration(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -48,3 +51,7 @@ pub fn default_subscriber_task_timeout() -> Duration {
|
|||||||
pub fn default_system_task_timeout() -> Duration {
|
pub fn default_system_task_timeout() -> Duration {
|
||||||
Duration::from_secs(3600)
|
Duration::from_secs(3600)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn default_cron_retry_duration() -> Duration {
|
||||||
|
Duration::from_secs(5)
|
||||||
|
}
|
||||||
|
@ -170,9 +170,14 @@ impl TaskService {
|
|||||||
let listener = self.setup_cron_due_listening().await?;
|
let listener = self.setup_cron_due_listening().await?;
|
||||||
let ctx = self.ctx.clone();
|
let ctx = self.ctx.clone();
|
||||||
let cron_worker_id = self.cron_worker_id.clone();
|
let cron_worker_id = self.cron_worker_id.clone();
|
||||||
|
let retry_duration = chrono::Duration::milliseconds(
|
||||||
|
self.config.cron_retry_duration.as_millis() as i64,
|
||||||
|
);
|
||||||
|
|
||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
if let Err(e) = Self::listen_cron_due(listener, ctx, &cron_worker_id).await {
|
if let Err(e) =
|
||||||
|
Self::listen_cron_due(listener, ctx, &cron_worker_id, retry_duration).await
|
||||||
|
{
|
||||||
tracing::error!("Error listening to cron due: {e}");
|
tracing::error!("Error listening to cron due: {e}");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -180,11 +185,24 @@ impl TaskService {
|
|||||||
Ok::<_, RecorderError>(())
|
Ok::<_, RecorderError>(())
|
||||||
},
|
},
|
||||||
async {
|
async {
|
||||||
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
|
|
||||||
let ctx = self.ctx.clone();
|
let ctx = self.ctx.clone();
|
||||||
|
let retry_duration = chrono::Duration::milliseconds(
|
||||||
|
self.config.cron_retry_duration.as_millis() as i64,
|
||||||
|
);
|
||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
|
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
|
||||||
loop {
|
loop {
|
||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
|
if let Err(e) = cron::Model::check_and_cleanup_expired_cron_locks(
|
||||||
|
ctx.as_ref(),
|
||||||
|
retry_duration,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::error!(
|
||||||
|
"Error checking and cleaning up expired cron locks: {e}"
|
||||||
|
);
|
||||||
|
}
|
||||||
if let Err(e) = cron::Model::check_and_trigger_due_crons(ctx.as_ref()).await
|
if let Err(e) = cron::Model::check_and_trigger_due_crons(ctx.as_ref()).await
|
||||||
{
|
{
|
||||||
tracing::error!("Error checking and triggering due crons: {e}");
|
tracing::error!("Error checking and triggering due crons: {e}");
|
||||||
@ -257,12 +275,19 @@ impl TaskService {
|
|||||||
mut listener: PgListener,
|
mut listener: PgListener,
|
||||||
ctx: Arc<dyn AppContextTrait>,
|
ctx: Arc<dyn AppContextTrait>,
|
||||||
worker_id: &str,
|
worker_id: &str,
|
||||||
|
retry_duration: chrono::Duration,
|
||||||
) -> RecorderResult<()> {
|
) -> RecorderResult<()> {
|
||||||
listener.listen(CRON_DUE_EVENT).await?;
|
listener.listen(CRON_DUE_EVENT).await?;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let notification = listener.recv().await?;
|
let notification = listener.recv().await?;
|
||||||
if let Err(e) =
|
if let Err(e) = cron::Model::handle_cron_notification(
|
||||||
cron::Model::handle_cron_notification(ctx.as_ref(), notification, worker_id).await
|
ctx.as_ref(),
|
||||||
|
notification,
|
||||||
|
worker_id,
|
||||||
|
retry_duration,
|
||||||
|
)
|
||||||
|
.await
|
||||||
{
|
{
|
||||||
tracing::error!("Error handling cron notification: {e}");
|
tracing::error!("Error handling cron notification: {e}");
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user