feat: more cron webui
This commit is contained in:
@@ -169,6 +169,7 @@ croner = "2.2.0"
|
||||
ts-rs = "11.0.1"
|
||||
secrecy = { version = "0.10.3", features = ["serde"] }
|
||||
paste = "1.0.15"
|
||||
chrono-tz = "0.10.3"
|
||||
|
||||
[dev-dependencies]
|
||||
inquire = { workspace = true }
|
||||
|
||||
@@ -18,6 +18,8 @@ use crate::{
|
||||
#[derive(Snafu, Debug)]
|
||||
#[snafu(visibility(pub(crate)))]
|
||||
pub enum RecorderError {
|
||||
#[snafu(transparent)]
|
||||
ChronoTzParseError { source: chrono_tz::ParseError },
|
||||
#[snafu(transparent)]
|
||||
SeaographyError { source: seaography::SeaographyError },
|
||||
#[snafu(transparent)]
|
||||
|
||||
@@ -20,6 +20,7 @@ fn skip_columns_for_entity_input(context: &mut BuilderContext) {
|
||||
cron::Column::SubscriberTaskCron
|
||||
| cron::Column::SystemTaskCron
|
||||
| cron::Column::CronExpr
|
||||
| cron::Column::CronTimezone
|
||||
| cron::Column::Enabled
|
||||
| cron::Column::TimeoutMs
|
||||
| cron::Column::MaxAttempts
|
||||
@@ -30,7 +31,8 @@ fn skip_columns_for_entity_input(context: &mut BuilderContext) {
|
||||
context.entity_input.insert_skips.push(entity_column_key);
|
||||
}
|
||||
for column in cron::Column::iter() {
|
||||
if matches!(column, |cron::Column::CronExpr| cron::Column::Enabled
|
||||
if matches!(column, |cron::Column::CronExpr| cron::Column::CronTimezone
|
||||
| cron::Column::Enabled
|
||||
| cron::Column::TimeoutMs
|
||||
| cron::Column::Priority
|
||||
| cron::Column::MaxAttempts)
|
||||
|
||||
@@ -178,6 +178,7 @@ pub enum Cron {
|
||||
SubscriberId,
|
||||
SubscriptionId,
|
||||
CronExpr,
|
||||
CronTimezone,
|
||||
NextRun,
|
||||
LastRun,
|
||||
LastError,
|
||||
|
||||
@@ -40,6 +40,7 @@ impl MigrationTrait for Migration {
|
||||
table_auto_z(Cron::Table)
|
||||
.col(pk_auto(Cron::Id))
|
||||
.col(string(Cron::CronExpr))
|
||||
.col(string(Cron::CronTimezone))
|
||||
.col(integer_null(Cron::SubscriberId))
|
||||
.col(integer_null(Cron::SubscriptionId))
|
||||
.col(timestamp_with_time_zone_null(Cron::NextRun))
|
||||
|
||||
@@ -8,6 +8,7 @@ pub use core::{
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use chrono_tz::Tz;
|
||||
use croner::Cron;
|
||||
use sea_orm::{
|
||||
ActiveValue::{self, Set},
|
||||
@@ -54,6 +55,7 @@ pub struct Model {
|
||||
pub subscriber_id: Option<i32>,
|
||||
pub subscription_id: Option<i32>,
|
||||
pub cron_expr: String,
|
||||
pub cron_timezone: String,
|
||||
pub next_run: Option<DateTimeUtc>,
|
||||
pub last_run: Option<DateTimeUtc>,
|
||||
pub last_error: Option<String>,
|
||||
@@ -140,16 +142,37 @@ impl ActiveModelBehavior for ActiveModel {
|
||||
where
|
||||
C: ConnectionTrait,
|
||||
{
|
||||
if let ActiveValue::Set(ref cron_expr) = self.cron_expr
|
||||
&& matches!(
|
||||
self.next_run,
|
||||
ActiveValue::NotSet | ActiveValue::Unchanged(_)
|
||||
)
|
||||
{
|
||||
let next_run =
|
||||
Model::calculate_next_run(cron_expr).map_err(|e| DbErr::Custom(e.to_string()))?;
|
||||
self.next_run = Set(Some(next_run));
|
||||
}
|
||||
match (
|
||||
&self.cron_expr as &ActiveValue<String>,
|
||||
&self.cron_timezone as &ActiveValue<String>,
|
||||
) {
|
||||
(ActiveValue::Set(cron_expr), ActiveValue::Set(timezone)) => {
|
||||
if matches!(
|
||||
&self.next_run,
|
||||
ActiveValue::NotSet | ActiveValue::Unchanged(_)
|
||||
) {
|
||||
let next_run = Model::calculate_next_run(cron_expr, timezone)
|
||||
.map_err(|e| DbErr::Custom(e.to_string()))?;
|
||||
self.next_run = Set(Some(next_run));
|
||||
}
|
||||
}
|
||||
(
|
||||
ActiveValue::Unchanged(_) | ActiveValue::NotSet,
|
||||
ActiveValue::Unchanged(_) | ActiveValue::NotSet,
|
||||
) => {}
|
||||
(_, _) => {
|
||||
if matches!(
|
||||
self.next_run,
|
||||
ActiveValue::NotSet | ActiveValue::Unchanged(_)
|
||||
) {
|
||||
return Err(DbErr::Custom(
|
||||
"Cron expr and timezone must be insert or update at same time when next \
|
||||
run is not set"
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
};
|
||||
if let ActiveValue::Set(Some(subscriber_id)) = self.subscriber_id
|
||||
&& let ActiveValue::Set(Some(ref subscriber_task)) = self.subscriber_task_cron
|
||||
&& subscriber_task.get_subscriber_id() != subscriber_id
|
||||
@@ -272,7 +295,7 @@ impl Model {
|
||||
async fn mark_cron_completed(&self, ctx: &dyn AppContextTrait) -> RecorderResult<()> {
|
||||
let db = ctx.db();
|
||||
|
||||
let next_run = Self::calculate_next_run(&self.cron_expr)?;
|
||||
let next_run = Self::calculate_next_run(&self.cron_expr, &self.cron_timezone)?;
|
||||
|
||||
ActiveModel {
|
||||
id: Set(self.id),
|
||||
@@ -310,7 +333,10 @@ impl Model {
|
||||
let next_run = if should_retry {
|
||||
Some(Utc::now() + retry_duration)
|
||||
} else {
|
||||
Some(Self::calculate_next_run(&self.cron_expr)?)
|
||||
Some(Self::calculate_next_run(
|
||||
&self.cron_expr,
|
||||
&self.cron_timezone,
|
||||
)?)
|
||||
};
|
||||
|
||||
ActiveModel {
|
||||
@@ -399,11 +425,17 @@ impl Model {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn calculate_next_run(cron_expr: &str) -> RecorderResult<DateTime<Utc>> {
|
||||
pub fn calculate_next_run(cron_expr: &str, timezone: &str) -> RecorderResult<DateTime<Utc>> {
|
||||
let user_tz = timezone.parse::<Tz>()?;
|
||||
|
||||
let user_tz_now = Utc::now().with_timezone(&user_tz);
|
||||
|
||||
let cron_expr = Cron::new(cron_expr).with_seconds_optional().parse()?;
|
||||
|
||||
let next = cron_expr.find_next_occurrence(&Utc::now(), false)?;
|
||||
let next = cron_expr.find_next_occurrence(&user_tz_now, false)?;
|
||||
|
||||
Ok(next)
|
||||
let next_utc = next.with_timezone(&Utc);
|
||||
|
||||
Ok(next_utc)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user