refactor: refactor graphql more
This commit is contained in:
@@ -1,9 +1,5 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub const CRON_DUE_EVENT: &str = "cron_due";
|
||||
|
||||
pub const CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME: &str =
|
||||
"check_and_cleanup_expired_cron_locks";
|
||||
pub const CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME: &str = "check_and_trigger_due_crons";
|
||||
|
||||
pub const NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME: &str = "notify_due_cron_when_mutating";
|
||||
@@ -12,12 +8,3 @@ pub const NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME: &str =
|
||||
pub const SETUP_CRON_EXTRA_FOREIGN_KEYS_FUNCTION_NAME: &str = "setup_cron_extra_foreign_keys";
|
||||
pub const SETUP_CRON_EXTRA_FOREIGN_KEYS_TRIGGER_NAME: &str =
|
||||
"setup_cron_extra_foreign_keys_trigger";
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct CronCreateOptions {
|
||||
pub cron_expr: String,
|
||||
pub priority: Option<i32>,
|
||||
pub timeout_ms: Option<i32>,
|
||||
pub max_attempts: Option<i32>,
|
||||
pub enabled: Option<bool>,
|
||||
}
|
||||
|
||||
@@ -2,23 +2,28 @@ mod core;
|
||||
mod registry;
|
||||
|
||||
pub use core::{
|
||||
CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME, CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME,
|
||||
CRON_DUE_EVENT, CronCreateOptions, NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME,
|
||||
NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME, SETUP_CRON_EXTRA_FOREIGN_KEYS_FUNCTION_NAME,
|
||||
SETUP_CRON_EXTRA_FOREIGN_KEYS_TRIGGER_NAME,
|
||||
CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, CRON_DUE_EVENT,
|
||||
NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME, NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME,
|
||||
SETUP_CRON_EXTRA_FOREIGN_KEYS_FUNCTION_NAME, SETUP_CRON_EXTRA_FOREIGN_KEYS_TRIGGER_NAME,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use croner::Cron;
|
||||
use sea_orm::{
|
||||
ActiveValue::Set, DeriveActiveEnum, DeriveDisplay, DeriveEntityModel, EnumIter, QuerySelect,
|
||||
Statement, TransactionTrait, entity::prelude::*, sea_query::LockType,
|
||||
ActiveValue::{self, Set},
|
||||
Condition, DeriveActiveEnum, DeriveDisplay, DeriveEntityModel, EnumIter, QuerySelect,
|
||||
Statement, TransactionTrait,
|
||||
entity::prelude::*,
|
||||
sea_query::{ExprTrait, LockBehavior, LockType},
|
||||
sqlx::postgres::PgNotification,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{app::AppContextTrait, errors::RecorderResult, models::subscriber_tasks};
|
||||
use crate::{
|
||||
app::AppContextTrait, errors::RecorderResult, models::subscriber_tasks,
|
||||
task::SubscriberTaskTrait,
|
||||
};
|
||||
|
||||
#[derive(
|
||||
Debug, Clone, PartialEq, Eq, DeriveActiveEnum, EnumIter, DeriveDisplay, Serialize, Deserialize,
|
||||
@@ -107,46 +112,47 @@ pub enum RelatedEntity {
|
||||
Subscription,
|
||||
}
|
||||
|
||||
impl ActiveModel {
|
||||
pub fn from_subscriber_task(
|
||||
subscriber_task: subscriber_tasks::SubscriberTask,
|
||||
cron_options: CronCreateOptions,
|
||||
) -> RecorderResult<Self> {
|
||||
let mut active_model = Self {
|
||||
next_run: Set(Some(Model::calculate_next_run(&cron_options.cron_expr)?)),
|
||||
cron_expr: Set(cron_options.cron_expr),
|
||||
subscriber_task: Set(Some(subscriber_task)),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
if let Some(priority) = cron_options.priority {
|
||||
active_model.priority = Set(priority);
|
||||
#[async_trait]
|
||||
impl ActiveModelBehavior for ActiveModel {
|
||||
async fn before_save<C>(mut self, _db: &C, _insert: bool) -> Result<Self, DbErr>
|
||||
where
|
||||
C: ConnectionTrait,
|
||||
{
|
||||
if let ActiveValue::Set(ref cron_expr) = self.cron_expr
|
||||
&& matches!(
|
||||
self.next_run,
|
||||
ActiveValue::NotSet | ActiveValue::Unchanged(_)
|
||||
)
|
||||
{
|
||||
let next_run =
|
||||
Model::calculate_next_run(cron_expr).map_err(|e| DbErr::Custom(e.to_string()))?;
|
||||
self.next_run = Set(Some(next_run));
|
||||
}
|
||||
if let ActiveValue::Set(Some(subscriber_id)) = self.subscriber_id {
|
||||
if let ActiveValue::Set(Some(ref subscriber_task)) = self.subscriber_task {
|
||||
if subscriber_task.get_subscriber_id() != subscriber_id {
|
||||
return Err(DbErr::Custom(
|
||||
"Subscriber task subscriber_id does not match cron subscriber_id"
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
} else {
|
||||
return Err(DbErr::Custom(
|
||||
"Cron subscriber_id is set but subscriber_task is not set".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(timeout_ms) = cron_options.timeout_ms {
|
||||
active_model.timeout_ms = Set(timeout_ms);
|
||||
}
|
||||
|
||||
if let Some(max_attempts) = cron_options.max_attempts {
|
||||
active_model.max_attempts = Set(max_attempts);
|
||||
}
|
||||
|
||||
if let Some(enabled) = cron_options.enabled {
|
||||
active_model.enabled = Set(enabled);
|
||||
}
|
||||
|
||||
Ok(active_model)
|
||||
Ok(self)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ActiveModelBehavior for ActiveModel {}
|
||||
|
||||
impl Model {
|
||||
pub async fn handle_cron_notification(
|
||||
ctx: &dyn AppContextTrait,
|
||||
notification: PgNotification,
|
||||
worker_id: &str,
|
||||
retry_duration: chrono::Duration,
|
||||
) -> RecorderResult<()> {
|
||||
let payload: Self = serde_json::from_str(notification.payload())?;
|
||||
let cron_id = payload.id;
|
||||
@@ -161,7 +167,8 @@ impl Model {
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Error executing cron {cron_id}: {e}");
|
||||
cron.mark_cron_failed(ctx, &e.to_string()).await?;
|
||||
cron.mark_cron_failed(ctx, &e.to_string(), retry_duration)
|
||||
.await?;
|
||||
}
|
||||
},
|
||||
None => {
|
||||
@@ -183,7 +190,7 @@ impl Model {
|
||||
let txn = db.begin().await?;
|
||||
|
||||
let cron = Entity::find_by_id(cron_id)
|
||||
.lock(LockType::Update)
|
||||
.lock_with_behavior(LockType::Update, LockBehavior::SkipLocked)
|
||||
.one(&txn)
|
||||
.await?;
|
||||
|
||||
@@ -250,7 +257,12 @@ impl Model {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn mark_cron_failed(&self, ctx: &dyn AppContextTrait, error: &str) -> RecorderResult<()> {
|
||||
async fn mark_cron_failed(
|
||||
&self,
|
||||
ctx: &dyn AppContextTrait,
|
||||
error: &str,
|
||||
retry_duration: chrono::Duration,
|
||||
) -> RecorderResult<()> {
|
||||
let db = ctx.db();
|
||||
|
||||
let should_retry = self.attempts < self.max_attempts;
|
||||
@@ -262,7 +274,7 @@ impl Model {
|
||||
};
|
||||
|
||||
let next_run = if should_retry {
|
||||
Some(Utc::now() + chrono::Duration::seconds(5))
|
||||
Some(Utc::now() + retry_duration)
|
||||
} else {
|
||||
Some(Self::calculate_next_run(&self.cron_expr)?)
|
||||
};
|
||||
@@ -284,19 +296,6 @@ impl Model {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn cleanup_expired_locks(ctx: &dyn AppContextTrait) -> RecorderResult<i32> {
|
||||
let db = ctx.db();
|
||||
|
||||
let result = db
|
||||
.execute(Statement::from_string(
|
||||
db.get_database_backend(),
|
||||
format!("SELECT {CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME}()"),
|
||||
))
|
||||
.await?;
|
||||
|
||||
Ok(result.rows_affected() as i32)
|
||||
}
|
||||
|
||||
pub async fn check_and_trigger_due_crons(ctx: &dyn AppContextTrait) -> RecorderResult<()> {
|
||||
let db = ctx.db();
|
||||
|
||||
@@ -309,6 +308,55 @@ impl Model {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn check_and_cleanup_expired_cron_locks(
|
||||
ctx: &dyn AppContextTrait,
|
||||
retry_duration: chrono::Duration,
|
||||
) -> RecorderResult<()> {
|
||||
let db = ctx.db();
|
||||
|
||||
let condition = Condition::all()
|
||||
.add(Column::Status.eq(CronStatus::Running))
|
||||
.add(Column::LastRun.is_not_null())
|
||||
.add(Column::TimeoutMs.is_not_null())
|
||||
.add(
|
||||
Expr::col(Column::LastRun)
|
||||
.add(Expr::col(Column::TimeoutMs).mul(Expr::cust("INTERVAL '1 millisecond'")))
|
||||
.lte(Expr::current_timestamp()),
|
||||
);
|
||||
|
||||
let cron_ids = Entity::find()
|
||||
.select_only()
|
||||
.column(Column::Id)
|
||||
.filter(condition.clone())
|
||||
.lock_with_behavior(LockType::Update, LockBehavior::SkipLocked)
|
||||
.into_tuple::<i32>()
|
||||
.all(db)
|
||||
.await?;
|
||||
|
||||
for cron_id in cron_ids {
|
||||
let txn = db.begin().await?;
|
||||
let locked_cron = Entity::find_by_id(cron_id)
|
||||
.filter(condition.clone())
|
||||
.lock_with_behavior(LockType::Update, LockBehavior::SkipLocked)
|
||||
.one(&txn)
|
||||
.await?;
|
||||
|
||||
if let Some(locked_cron) = locked_cron {
|
||||
locked_cron
|
||||
.mark_cron_failed(
|
||||
ctx,
|
||||
format!("Cron timeout of {}ms", locked_cron.timeout_ms).as_str(),
|
||||
retry_duration,
|
||||
)
|
||||
.await?;
|
||||
txn.commit().await?;
|
||||
} else {
|
||||
txn.rollback().await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn calculate_next_run(cron_expr: &str) -> RecorderResult<DateTime<Utc>> {
|
||||
let cron_expr = Cron::new(cron_expr).parse()?;
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use async_trait::async_trait;
|
||||
use sea_orm::{
|
||||
ActiveModelTrait, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, Insert, IntoActiveModel,
|
||||
Iterable, QueryResult, QueryTrait, SelectModel, SelectorRaw, sea_query::Query,
|
||||
QueryResult, QueryTrait, sea_query::Query,
|
||||
};
|
||||
|
||||
#[async_trait]
|
||||
@@ -10,13 +10,6 @@ where
|
||||
<A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
|
||||
A: ActiveModelTrait,
|
||||
{
|
||||
fn exec_with_returning_models<C>(
|
||||
self,
|
||||
db: &C,
|
||||
) -> SelectorRaw<SelectModel<<A::Entity as EntityTrait>::Model>>
|
||||
where
|
||||
C: ConnectionTrait;
|
||||
|
||||
async fn exec_with_returning_columns<C, I>(
|
||||
self,
|
||||
db: &C,
|
||||
@@ -33,26 +26,6 @@ where
|
||||
<A::Entity as EntityTrait>::Model: IntoActiveModel<A>,
|
||||
A: ActiveModelTrait + Send,
|
||||
{
|
||||
fn exec_with_returning_models<C>(
|
||||
self,
|
||||
db: &C,
|
||||
) -> SelectorRaw<SelectModel<<A::Entity as EntityTrait>::Model>>
|
||||
where
|
||||
C: ConnectionTrait,
|
||||
{
|
||||
let mut insert_statement = self.into_query();
|
||||
let db_backend = db.get_database_backend();
|
||||
let returning = Query::returning().exprs(
|
||||
<A::Entity as EntityTrait>::Column::iter()
|
||||
.map(|c| c.select_as(c.into_returning_expr(db_backend))),
|
||||
);
|
||||
insert_statement.returning(returning);
|
||||
let insert_statement = db_backend.build(&insert_statement);
|
||||
SelectorRaw::<SelectModel<<A::Entity as EntityTrait>::Model>>::from_statement(
|
||||
insert_statement,
|
||||
)
|
||||
}
|
||||
|
||||
async fn exec_with_returning_columns<C, I>(
|
||||
self,
|
||||
db: &C,
|
||||
|
||||
Reference in New Issue
Block a user