From f83371bbf9a3bf451583278ca6155180ff828ba5 Mon Sep 17 00:00:00 2001 From: lonelyhentxi Date: Sat, 28 Jun 2025 04:10:18 +0800 Subject: [PATCH] fix: fix task lifetime --- apps/recorder/src/task/config.rs | 17 +++--- apps/recorder/src/task/service.rs | 8 +-- .../routes/_app/subscriptions/detail.$id.tsx | 52 +++++++++---------- 3 files changed, 39 insertions(+), 38 deletions(-) diff --git a/apps/recorder/src/task/config.rs b/apps/recorder/src/task/config.rs index f01d423..3b2c18d 100644 --- a/apps/recorder/src/task/config.rs +++ b/apps/recorder/src/task/config.rs @@ -8,10 +8,10 @@ pub struct TaskConfig { pub subscriber_task_concurrency: u32, #[serde(default = "default_system_task_workers")] pub system_task_concurrency: u32, - #[serde(default = "default_subscriber_task_timeout")] - pub subscriber_task_timeout: Duration, - #[serde(default = "default_system_task_timeout")] - pub system_task_timeout: Duration, + #[serde(default = "default_subscriber_task_reenqueue_orphaned_after")] + pub subscriber_task_reenqueue_orphaned_after: Duration, + #[serde(default = "default_system_task_reenqueue_orphaned_after")] + pub system_task_reenqueue_orphaned_after: Duration, #[serde(default = "default_cron_retry_duration")] pub cron_retry_duration: Duration, } @@ -21,8 +21,9 @@ impl Default for TaskConfig { Self { subscriber_task_concurrency: default_subscriber_task_workers(), system_task_concurrency: default_system_task_workers(), - subscriber_task_timeout: default_subscriber_task_timeout(), - system_task_timeout: default_system_task_timeout(), + subscriber_task_reenqueue_orphaned_after: + default_subscriber_task_reenqueue_orphaned_after(), + system_task_reenqueue_orphaned_after: default_system_task_reenqueue_orphaned_after(), cron_retry_duration: default_cron_retry_duration(), } } @@ -44,11 +45,11 @@ pub fn default_system_task_workers() -> u32 { } } -pub fn default_subscriber_task_timeout() -> Duration { +pub fn default_subscriber_task_reenqueue_orphaned_after() -> Duration { Duration::from_secs(3600) } -pub fn default_system_task_timeout() -> Duration { +pub fn default_system_task_reenqueue_orphaned_after() -> Duration { Duration::from_secs(3600) } diff --git a/apps/recorder/src/task/service.rs b/apps/recorder/src/task/service.rs index f2492cd..1401865 100644 --- a/apps/recorder/src/task/service.rs +++ b/apps/recorder/src/task/service.rs @@ -41,10 +41,10 @@ impl TaskService { }; let pool = ctx.db().get_postgres_connection_pool().clone(); - let subscriber_task_storage_config = - Config::new(SUBSCRIBER_TASK_APALIS_NAME).set_keep_alive(config.subscriber_task_timeout); - let system_task_storage_config = - Config::new(SYSTEM_TASK_APALIS_NAME).set_keep_alive(config.system_task_timeout); + let subscriber_task_storage_config = Config::new(SUBSCRIBER_TASK_APALIS_NAME) + .set_reenqueue_orphaned_after(config.subscriber_task_reenqueue_orphaned_after); + let system_task_storage_config = Config::new(SYSTEM_TASK_APALIS_NAME) + .set_reenqueue_orphaned_after(config.system_task_reenqueue_orphaned_after); let subscriber_task_storage = ApalisPostgresStorage::new_with_config(pool.clone(), subscriber_task_storage_config); let system_task_storage = diff --git a/apps/webui/src/presentation/routes/_app/subscriptions/detail.$id.tsx b/apps/webui/src/presentation/routes/_app/subscriptions/detail.$id.tsx index b3c289e..5160462 100644 --- a/apps/webui/src/presentation/routes/_app/subscriptions/detail.$id.tsx +++ b/apps/webui/src/presentation/routes/_app/subscriptions/detail.$id.tsx @@ -212,20 +212,6 @@ function SubscriptionDetailRouteComponent() { View subscription detail -
- -
@@ -439,18 +425,32 @@ function SubscriptionDetailRouteComponent() {
- - - - - - +
+ + + + + + + +
{subscription.subscriberTask?.nodes &&