temp save
This commit is contained in:
@@ -6,8 +6,9 @@ use apalis_sql::{
|
||||
context::SqlContext,
|
||||
postgres::{PgListen as ApalisPgListen, PostgresStorage as ApalisPostgresStorage},
|
||||
};
|
||||
use sea_orm::sqlx::postgres::PgListener;
|
||||
use sea_orm::{ActiveModelTrait, sqlx::postgres::PgListener};
|
||||
use tokio::sync::RwLock;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
app::AppContextTrait,
|
||||
@@ -53,7 +54,7 @@ impl TaskService {
|
||||
|
||||
Ok(Self {
|
||||
config,
|
||||
cron_worker_id: nanoid::nanoid!(),
|
||||
cron_worker_id: Uuid::now_v7().to_string(),
|
||||
ctx,
|
||||
subscriber_task_storage: Arc::new(RwLock::new(subscriber_task_storage)),
|
||||
system_task_storage: Arc::new(RwLock::new(system_task_storage)),
|
||||
@@ -136,6 +137,21 @@ impl TaskService {
|
||||
Ok(task_id)
|
||||
}
|
||||
|
||||
pub async fn add_subscriber_task_cron(
|
||||
&self,
|
||||
cm: cron::ActiveModel,
|
||||
) -> RecorderResult<cron::Model> {
|
||||
let db = self.ctx.db();
|
||||
let m = cm.insert(db).await?;
|
||||
Ok(m)
|
||||
}
|
||||
|
||||
pub async fn add_system_task_cron(&self, cm: cron::ActiveModel) -> RecorderResult<cron::Model> {
|
||||
let db = self.ctx.db();
|
||||
let m = cm.insert(db).await?;
|
||||
Ok(m)
|
||||
}
|
||||
|
||||
pub async fn run<F, Fut>(&self, shutdown_signal: Option<F>) -> RecorderResult<()>
|
||||
where
|
||||
F: Fn() -> Fut + Send + 'static,
|
||||
@@ -167,45 +183,48 @@ impl TaskService {
|
||||
Ok::<_, RecorderError>(())
|
||||
},
|
||||
async {
|
||||
let listener = self.setup_cron_due_listening().await?;
|
||||
let ctx = self.ctx.clone();
|
||||
let mut listener = self.setup_cron_due_listening().await?;
|
||||
let cron_worker_id = self.cron_worker_id.clone();
|
||||
let retry_duration = chrono::Duration::milliseconds(
|
||||
self.config.cron_retry_duration.as_millis() as i64,
|
||||
);
|
||||
let cron_interval_duration = self.config.cron_interval_duration;
|
||||
listener.listen(CRON_DUE_EVENT).await?;
|
||||
tracing::debug!("Listening for cron due event...");
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
if let Err(e) =
|
||||
Self::listen_cron_due(listener, ctx, &cron_worker_id, retry_duration).await
|
||||
{
|
||||
tracing::error!("Error listening to cron due: {e}");
|
||||
tokio::task::spawn({
|
||||
let ctx = self.ctx.clone();
|
||||
async move {
|
||||
if let Err(e) =
|
||||
Self::listen_cron_due(listener, ctx, &cron_worker_id, retry_duration)
|
||||
.await
|
||||
{
|
||||
tracing::error!("Error listening to cron due: {e}");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok::<_, RecorderError>(())
|
||||
},
|
||||
async {
|
||||
let ctx = self.ctx.clone();
|
||||
let retry_duration = chrono::Duration::milliseconds(
|
||||
self.config.cron_retry_duration.as_millis() as i64,
|
||||
);
|
||||
tokio::task::spawn(async move {
|
||||
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
if let Err(e) = cron::Model::check_and_cleanup_expired_cron_locks(
|
||||
ctx.as_ref(),
|
||||
retry_duration,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!(
|
||||
"Error checking and cleaning up expired cron locks: {e}"
|
||||
);
|
||||
}
|
||||
if let Err(e) = cron::Model::check_and_trigger_due_crons(ctx.as_ref()).await
|
||||
{
|
||||
tracing::error!("Error checking and triggering due crons: {e}");
|
||||
tokio::task::spawn({
|
||||
let ctx = self.ctx.clone();
|
||||
async move {
|
||||
let mut interval = tokio::time::interval(cron_interval_duration);
|
||||
loop {
|
||||
interval.tick().await;
|
||||
if let Err(e) = cron::Model::check_and_cleanup_expired_cron_locks(
|
||||
ctx.as_ref(),
|
||||
retry_duration,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!(
|
||||
"Error checking and cleaning up expired cron locks: {e}"
|
||||
);
|
||||
}
|
||||
if let Err(e) =
|
||||
cron::Model::check_and_trigger_due_crons(ctx.as_ref()).await
|
||||
{
|
||||
tracing::error!("Error checking and triggering due crons: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -267,6 +286,7 @@ impl TaskService {
|
||||
async fn setup_cron_due_listening(&self) -> RecorderResult<PgListener> {
|
||||
let pool = self.ctx.db().get_postgres_connection_pool().clone();
|
||||
let listener = PgListener::connect_with(&pool).await?;
|
||||
tracing::debug!("Cron due listener connected to postgres");
|
||||
|
||||
Ok(listener)
|
||||
}
|
||||
@@ -277,10 +297,9 @@ impl TaskService {
|
||||
worker_id: &str,
|
||||
retry_duration: chrono::Duration,
|
||||
) -> RecorderResult<()> {
|
||||
listener.listen(CRON_DUE_EVENT).await?;
|
||||
|
||||
loop {
|
||||
let notification = listener.recv().await?;
|
||||
tracing::debug!("Received cron due event: {:?}", notification);
|
||||
if let Err(e) = cron::Model::handle_cron_notification(
|
||||
ctx.as_ref(),
|
||||
notification,
|
||||
@@ -298,13 +317,20 @@ impl TaskService {
|
||||
#[cfg(test)]
|
||||
#[allow(unused_variables)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use rstest::{fixture, rstest};
|
||||
use sea_orm::ActiveValue;
|
||||
use tracing::Level;
|
||||
|
||||
use super::*;
|
||||
use crate::test_utils::{
|
||||
// app::TestingPreset,
|
||||
tracing::try_init_testing_tracing,
|
||||
use crate::{
|
||||
models::cron,
|
||||
task::EchoTask,
|
||||
test_utils::{
|
||||
app::{TestingAppContextConfig, TestingPreset},
|
||||
tracing::try_init_testing_tracing,
|
||||
},
|
||||
};
|
||||
|
||||
#[fixture]
|
||||
@@ -314,7 +340,40 @@ mod tests {
|
||||
|
||||
#[rstest]
|
||||
#[tokio::test]
|
||||
// #[tracing_test::traced_test]
|
||||
async fn test_cron_due_listening(before_each: ()) -> RecorderResult<()> {
|
||||
todo!()
|
||||
let preset = TestingPreset::default_with_config(
|
||||
TestingAppContextConfig::builder()
|
||||
.task_config(TaskConfig {
|
||||
cron_interval_duration: Duration::from_secs(1),
|
||||
..Default::default()
|
||||
})
|
||||
.build(),
|
||||
)
|
||||
.await?;
|
||||
let app_ctx = preset.app_ctx;
|
||||
let task_service = app_ctx.task();
|
||||
|
||||
let task_id = Uuid::now_v7().to_string();
|
||||
|
||||
let echo_cron = cron::ActiveModel {
|
||||
cron_expr: ActiveValue::Set("*/1 * * * * *".to_string()),
|
||||
system_task_cron: ActiveValue::Set(Some(
|
||||
EchoTask::builder().task_id(task_id.clone()).build().into(),
|
||||
)),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let _ = task_service
|
||||
.run(Some(async move || {
|
||||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||
}))
|
||||
.await;
|
||||
|
||||
// assert!(logs_contain(&format!(
|
||||
// "EchoTask {task_id} start running at"
|
||||
// )));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user