fix: fix task lifetime

This commit is contained in:
2025-06-28 04:10:18 +08:00
parent c858cc7d44
commit f83371bbf9
3 changed files with 39 additions and 38 deletions

View File

@@ -8,10 +8,10 @@ pub struct TaskConfig {
pub subscriber_task_concurrency: u32,
#[serde(default = "default_system_task_workers")]
pub system_task_concurrency: u32,
#[serde(default = "default_subscriber_task_timeout")]
pub subscriber_task_timeout: Duration,
#[serde(default = "default_system_task_timeout")]
pub system_task_timeout: Duration,
#[serde(default = "default_subscriber_task_reenqueue_orphaned_after")]
pub subscriber_task_reenqueue_orphaned_after: Duration,
#[serde(default = "default_system_task_reenqueue_orphaned_after")]
pub system_task_reenqueue_orphaned_after: Duration,
#[serde(default = "default_cron_retry_duration")]
pub cron_retry_duration: Duration,
}
@@ -21,8 +21,9 @@ impl Default for TaskConfig {
Self {
subscriber_task_concurrency: default_subscriber_task_workers(),
system_task_concurrency: default_system_task_workers(),
subscriber_task_timeout: default_subscriber_task_timeout(),
system_task_timeout: default_system_task_timeout(),
subscriber_task_reenqueue_orphaned_after:
default_subscriber_task_reenqueue_orphaned_after(),
system_task_reenqueue_orphaned_after: default_system_task_reenqueue_orphaned_after(),
cron_retry_duration: default_cron_retry_duration(),
}
}
@@ -44,11 +45,11 @@ pub fn default_system_task_workers() -> u32 {
}
}
pub fn default_subscriber_task_timeout() -> Duration {
pub fn default_subscriber_task_reenqueue_orphaned_after() -> Duration {
Duration::from_secs(3600)
}
pub fn default_system_task_timeout() -> Duration {
pub fn default_system_task_reenqueue_orphaned_after() -> Duration {
Duration::from_secs(3600)
}

View File

@@ -41,10 +41,10 @@ impl TaskService {
};
let pool = ctx.db().get_postgres_connection_pool().clone();
let subscriber_task_storage_config =
Config::new(SUBSCRIBER_TASK_APALIS_NAME).set_keep_alive(config.subscriber_task_timeout);
let system_task_storage_config =
Config::new(SYSTEM_TASK_APALIS_NAME).set_keep_alive(config.system_task_timeout);
let subscriber_task_storage_config = Config::new(SUBSCRIBER_TASK_APALIS_NAME)
.set_reenqueue_orphaned_after(config.subscriber_task_reenqueue_orphaned_after);
let system_task_storage_config = Config::new(SYSTEM_TASK_APALIS_NAME)
.set_reenqueue_orphaned_after(config.system_task_reenqueue_orphaned_after);
let subscriber_task_storage =
ApalisPostgresStorage::new_with_config(pool.clone(), subscriber_task_storage_config);
let system_task_storage =