diff --git a/apps/recorder/src/graphql/infra/crypto.rs b/apps/recorder/src/graphql/infra/crypto.rs index 4b1fd0a..f17cb9c 100644 --- a/apps/recorder/src/graphql/infra/crypto.rs +++ b/apps/recorder/src/graphql/infra/crypto.rs @@ -4,10 +4,7 @@ use async_graphql::dynamic::{ResolverContext, ValueAccessor}; use sea_orm::{EntityTrait, Value as SeaValue}; use seaography::{BuilderContext, SeaResult}; -use crate::{ - app::AppContextTrait, - graphql::infra::name::{get_column_name, get_entity_name}, -}; +use crate::{app::AppContextTrait, graphql::infra::name::get_entity_and_column_name}; pub fn register_crypto_column_input_conversion_to_schema_context( context: &mut BuilderContext, @@ -17,13 +14,8 @@ pub fn register_crypto_column_input_conversion_to_schema_context( T: EntityTrait, ::Model: Sync, { - let entity_key = get_entity_name::(context); - let column_name = get_column_name::(context, column); - let entity_name = context.entity_object.type_name.as_ref()(&entity_key); - let column_name = context.entity_object.column_name.as_ref()(&entity_key, &column_name); - context.types.input_conversions.insert( - format!("{entity_name}.{column_name}"), + get_entity_and_column_name::(context, column), Box::new( move |_resolve_context: &ResolverContext<'_>, value: &ValueAccessor| @@ -44,13 +36,8 @@ pub fn register_crypto_column_output_conversion_to_schema_context( T: EntityTrait, ::Model: Sync, { - let entity_key = get_entity_name::(context); - let column_name = get_column_name::(context, column); - let entity_name = context.entity_object.type_name.as_ref()(&entity_key); - let column_name = context.entity_object.column_name.as_ref()(&entity_key, &column_name); - context.types.output_conversions.insert( - format!("{entity_name}.{column_name}"), + get_entity_and_column_name::(context, column), Box::new( move |value: &sea_orm::Value| -> SeaResult { if let SeaValue::String(s) = value { diff --git a/apps/recorder/src/migrations/m20250629_065628_add_cron.rs b/apps/recorder/src/migrations/m20250629_065628_add_cron.rs index 1695bce..410d4e6 100644 --- a/apps/recorder/src/migrations/m20250629_065628_add_cron.rs +++ b/apps/recorder/src/migrations/m20250629_065628_add_cron.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +use sea_orm::ActiveEnum; use sea_orm_migration::{prelude::*, schema::*}; use crate::{ @@ -6,7 +7,6 @@ use crate::{ Cron, CustomSchemaManagerExt, GeneralIds, Subscribers, Subscriptions, table_auto_z, }, models::cron::{ - CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME, CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, CRON_DUE_EVENT, CronStatus, CronStatusEnum, NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME, NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME, SETUP_CRON_EXTRA_FOREIGN_KEYS_FUNCTION_NAME, SETUP_CRON_EXTRA_FOREIGN_KEYS_TRIGGER_NAME, @@ -151,7 +151,7 @@ impl MigrationTrait for Migration { locked_at = &Cron::LockedAt.to_string(), timeout_ms = &Cron::TimeoutMs.to_string(), status = &Cron::Status.to_string(), - pending = &CronStatus::Pending.to_string(), + pending = &CronStatus::Pending.to_value(), attempts = &Cron::Attempts.to_string(), max_attempts = &Cron::MaxAttempts.to_string(), )) @@ -166,35 +166,6 @@ impl MigrationTrait for Migration { )) .await?; - db.execute_unprepared(&format!( - r#"CREATE OR REPLACE FUNCTION {CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME}() RETURNS INTEGER AS $$ - DECLARE - affected_count INTEGER; - BEGIN - UPDATE {table} - SET - {locked_by} = NULL, - {locked_at} = NULL, - {status} = '{pending}' - WHERE - {locked_by} IS NOT NULL - AND {timeout_ms} IS NOT NULL - AND {locked_at} + {timeout_ms} * INTERVAL '1 millisecond' <= CURRENT_TIMESTAMP - AND {status} = '{running}'; - GET DIAGNOSTICS affected_count = ROW_COUNT; - RETURN affected_count; - END; - $$ LANGUAGE plpgsql;"#, - table = &Cron::Table.to_string(), - locked_by = &Cron::LockedBy.to_string(), - locked_at = &Cron::LockedAt.to_string(), - status = &Cron::Status.to_string(), - running = &CronStatus::Running.to_string(), - pending = &CronStatus::Pending.to_string(), - timeout_ms = &Cron::TimeoutMs.to_string(), - )) - .await?; - db.execute_unprepared(&format!( r#"CREATE OR REPLACE FUNCTION {CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME}() RETURNS INTEGER AS $$ DECLARE @@ -228,7 +199,7 @@ impl MigrationTrait for Migration { next_run = &Cron::NextRun.to_string(), enabled = &Cron::Enabled.to_string(), status = &Cron::Status.to_string(), - pending = &CronStatus::Pending.to_string(), + pending = &CronStatus::Pending.to_value(), locked_at = &Cron::LockedAt.to_string(), timeout_ms = &Cron::TimeoutMs.to_string(), priority = &Cron::Priority.to_string(), @@ -254,11 +225,6 @@ impl MigrationTrait for Migration { )) .await?; - db.execute_unprepared(&format!( - r#"DROP FUNCTION IF EXISTS {CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME}();"#, - )) - .await?; - db.execute_unprepared(&format!( r#"DROP FUNCTION IF EXISTS {CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME}();"#, )) diff --git a/apps/recorder/src/models/cron/core.rs b/apps/recorder/src/models/cron/core.rs index bbca05a..5af92fe 100644 --- a/apps/recorder/src/models/cron/core.rs +++ b/apps/recorder/src/models/cron/core.rs @@ -1,9 +1,5 @@ -use serde::{Deserialize, Serialize}; - pub const CRON_DUE_EVENT: &str = "cron_due"; -pub const CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME: &str = - "check_and_cleanup_expired_cron_locks"; pub const CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME: &str = "check_and_trigger_due_crons"; pub const NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME: &str = "notify_due_cron_when_mutating"; @@ -12,12 +8,3 @@ pub const NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME: &str = pub const SETUP_CRON_EXTRA_FOREIGN_KEYS_FUNCTION_NAME: &str = "setup_cron_extra_foreign_keys"; pub const SETUP_CRON_EXTRA_FOREIGN_KEYS_TRIGGER_NAME: &str = "setup_cron_extra_foreign_keys_trigger"; - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct CronCreateOptions { - pub cron_expr: String, - pub priority: Option, - pub timeout_ms: Option, - pub max_attempts: Option, - pub enabled: Option, -} diff --git a/apps/recorder/src/models/cron/mod.rs b/apps/recorder/src/models/cron/mod.rs index 8588d29..fa8cf0a 100644 --- a/apps/recorder/src/models/cron/mod.rs +++ b/apps/recorder/src/models/cron/mod.rs @@ -2,23 +2,28 @@ mod core; mod registry; pub use core::{ - CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME, CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, - CRON_DUE_EVENT, CronCreateOptions, NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME, - NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME, SETUP_CRON_EXTRA_FOREIGN_KEYS_FUNCTION_NAME, - SETUP_CRON_EXTRA_FOREIGN_KEYS_TRIGGER_NAME, + CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, CRON_DUE_EVENT, + NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME, NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME, + SETUP_CRON_EXTRA_FOREIGN_KEYS_FUNCTION_NAME, SETUP_CRON_EXTRA_FOREIGN_KEYS_TRIGGER_NAME, }; use async_trait::async_trait; use chrono::{DateTime, Utc}; use croner::Cron; use sea_orm::{ - ActiveValue::Set, DeriveActiveEnum, DeriveDisplay, DeriveEntityModel, EnumIter, QuerySelect, - Statement, TransactionTrait, entity::prelude::*, sea_query::LockType, + ActiveValue::{self, Set}, + Condition, DeriveActiveEnum, DeriveDisplay, DeriveEntityModel, EnumIter, QuerySelect, + Statement, TransactionTrait, + entity::prelude::*, + sea_query::{ExprTrait, LockBehavior, LockType}, sqlx::postgres::PgNotification, }; use serde::{Deserialize, Serialize}; -use crate::{app::AppContextTrait, errors::RecorderResult, models::subscriber_tasks}; +use crate::{ + app::AppContextTrait, errors::RecorderResult, models::subscriber_tasks, + task::SubscriberTaskTrait, +}; #[derive( Debug, Clone, PartialEq, Eq, DeriveActiveEnum, EnumIter, DeriveDisplay, Serialize, Deserialize, @@ -107,46 +112,47 @@ pub enum RelatedEntity { Subscription, } -impl ActiveModel { - pub fn from_subscriber_task( - subscriber_task: subscriber_tasks::SubscriberTask, - cron_options: CronCreateOptions, - ) -> RecorderResult { - let mut active_model = Self { - next_run: Set(Some(Model::calculate_next_run(&cron_options.cron_expr)?)), - cron_expr: Set(cron_options.cron_expr), - subscriber_task: Set(Some(subscriber_task)), - ..Default::default() - }; - - if let Some(priority) = cron_options.priority { - active_model.priority = Set(priority); +#[async_trait] +impl ActiveModelBehavior for ActiveModel { + async fn before_save(mut self, _db: &C, _insert: bool) -> Result + where + C: ConnectionTrait, + { + if let ActiveValue::Set(ref cron_expr) = self.cron_expr + && matches!( + self.next_run, + ActiveValue::NotSet | ActiveValue::Unchanged(_) + ) + { + let next_run = + Model::calculate_next_run(cron_expr).map_err(|e| DbErr::Custom(e.to_string()))?; + self.next_run = Set(Some(next_run)); + } + if let ActiveValue::Set(Some(subscriber_id)) = self.subscriber_id { + if let ActiveValue::Set(Some(ref subscriber_task)) = self.subscriber_task { + if subscriber_task.get_subscriber_id() != subscriber_id { + return Err(DbErr::Custom( + "Subscriber task subscriber_id does not match cron subscriber_id" + .to_string(), + )); + } + } else { + return Err(DbErr::Custom( + "Cron subscriber_id is set but subscriber_task is not set".to_string(), + )); + } } - if let Some(timeout_ms) = cron_options.timeout_ms { - active_model.timeout_ms = Set(timeout_ms); - } - - if let Some(max_attempts) = cron_options.max_attempts { - active_model.max_attempts = Set(max_attempts); - } - - if let Some(enabled) = cron_options.enabled { - active_model.enabled = Set(enabled); - } - - Ok(active_model) + Ok(self) } } -#[async_trait] -impl ActiveModelBehavior for ActiveModel {} - impl Model { pub async fn handle_cron_notification( ctx: &dyn AppContextTrait, notification: PgNotification, worker_id: &str, + retry_duration: chrono::Duration, ) -> RecorderResult<()> { let payload: Self = serde_json::from_str(notification.payload())?; let cron_id = payload.id; @@ -161,7 +167,8 @@ impl Model { } Err(e) => { tracing::error!("Error executing cron {cron_id}: {e}"); - cron.mark_cron_failed(ctx, &e.to_string()).await?; + cron.mark_cron_failed(ctx, &e.to_string(), retry_duration) + .await?; } }, None => { @@ -183,7 +190,7 @@ impl Model { let txn = db.begin().await?; let cron = Entity::find_by_id(cron_id) - .lock(LockType::Update) + .lock_with_behavior(LockType::Update, LockBehavior::SkipLocked) .one(&txn) .await?; @@ -250,7 +257,12 @@ impl Model { Ok(()) } - async fn mark_cron_failed(&self, ctx: &dyn AppContextTrait, error: &str) -> RecorderResult<()> { + async fn mark_cron_failed( + &self, + ctx: &dyn AppContextTrait, + error: &str, + retry_duration: chrono::Duration, + ) -> RecorderResult<()> { let db = ctx.db(); let should_retry = self.attempts < self.max_attempts; @@ -262,7 +274,7 @@ impl Model { }; let next_run = if should_retry { - Some(Utc::now() + chrono::Duration::seconds(5)) + Some(Utc::now() + retry_duration) } else { Some(Self::calculate_next_run(&self.cron_expr)?) }; @@ -284,19 +296,6 @@ impl Model { Ok(()) } - pub async fn cleanup_expired_locks(ctx: &dyn AppContextTrait) -> RecorderResult { - let db = ctx.db(); - - let result = db - .execute(Statement::from_string( - db.get_database_backend(), - format!("SELECT {CHECK_AND_CLEANUP_EXPIRED_CRON_LOCKS_FUNCTION_NAME}()"), - )) - .await?; - - Ok(result.rows_affected() as i32) - } - pub async fn check_and_trigger_due_crons(ctx: &dyn AppContextTrait) -> RecorderResult<()> { let db = ctx.db(); @@ -309,6 +308,55 @@ impl Model { Ok(()) } + pub async fn check_and_cleanup_expired_cron_locks( + ctx: &dyn AppContextTrait, + retry_duration: chrono::Duration, + ) -> RecorderResult<()> { + let db = ctx.db(); + + let condition = Condition::all() + .add(Column::Status.eq(CronStatus::Running)) + .add(Column::LastRun.is_not_null()) + .add(Column::TimeoutMs.is_not_null()) + .add( + Expr::col(Column::LastRun) + .add(Expr::col(Column::TimeoutMs).mul(Expr::cust("INTERVAL '1 millisecond'"))) + .lte(Expr::current_timestamp()), + ); + + let cron_ids = Entity::find() + .select_only() + .column(Column::Id) + .filter(condition.clone()) + .lock_with_behavior(LockType::Update, LockBehavior::SkipLocked) + .into_tuple::() + .all(db) + .await?; + + for cron_id in cron_ids { + let txn = db.begin().await?; + let locked_cron = Entity::find_by_id(cron_id) + .filter(condition.clone()) + .lock_with_behavior(LockType::Update, LockBehavior::SkipLocked) + .one(&txn) + .await?; + + if let Some(locked_cron) = locked_cron { + locked_cron + .mark_cron_failed( + ctx, + format!("Cron timeout of {}ms", locked_cron.timeout_ms).as_str(), + retry_duration, + ) + .await?; + txn.commit().await?; + } else { + txn.rollback().await?; + } + } + Ok(()) + } + pub fn calculate_next_run(cron_expr: &str) -> RecorderResult> { let cron_expr = Cron::new(cron_expr).parse()?; diff --git a/apps/recorder/src/models/query/mod.rs b/apps/recorder/src/models/query/mod.rs index ce7fe5a..2d1c548 100644 --- a/apps/recorder/src/models/query/mod.rs +++ b/apps/recorder/src/models/query/mod.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use sea_orm::{ ActiveModelTrait, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, Insert, IntoActiveModel, - Iterable, QueryResult, QueryTrait, SelectModel, SelectorRaw, sea_query::Query, + QueryResult, QueryTrait, sea_query::Query, }; #[async_trait] @@ -10,13 +10,6 @@ where ::Model: IntoActiveModel, A: ActiveModelTrait, { - fn exec_with_returning_models( - self, - db: &C, - ) -> SelectorRaw::Model>> - where - C: ConnectionTrait; - async fn exec_with_returning_columns( self, db: &C, @@ -33,26 +26,6 @@ where ::Model: IntoActiveModel, A: ActiveModelTrait + Send, { - fn exec_with_returning_models( - self, - db: &C, - ) -> SelectorRaw::Model>> - where - C: ConnectionTrait, - { - let mut insert_statement = self.into_query(); - let db_backend = db.get_database_backend(); - let returning = Query::returning().exprs( - ::Column::iter() - .map(|c| c.select_as(c.into_returning_expr(db_backend))), - ); - insert_statement.returning(returning); - let insert_statement = db_backend.build(&insert_statement); - SelectorRaw::::Model>>::from_statement( - insert_statement, - ) - } - async fn exec_with_returning_columns( self, db: &C, diff --git a/apps/recorder/src/task/config.rs b/apps/recorder/src/task/config.rs index 7d8379e..3b2c18d 100644 --- a/apps/recorder/src/task/config.rs +++ b/apps/recorder/src/task/config.rs @@ -8,10 +8,12 @@ pub struct TaskConfig { pub subscriber_task_concurrency: u32, #[serde(default = "default_system_task_workers")] pub system_task_concurrency: u32, - #[serde(default = "default_subscriber_task_timeout")] - pub subscriber_task_timeout: Duration, - #[serde(default = "default_system_task_timeout")] - pub system_task_timeout: Duration, + #[serde(default = "default_subscriber_task_reenqueue_orphaned_after")] + pub subscriber_task_reenqueue_orphaned_after: Duration, + #[serde(default = "default_system_task_reenqueue_orphaned_after")] + pub system_task_reenqueue_orphaned_after: Duration, + #[serde(default = "default_cron_retry_duration")] + pub cron_retry_duration: Duration, } impl Default for TaskConfig { @@ -19,8 +21,10 @@ impl Default for TaskConfig { Self { subscriber_task_concurrency: default_subscriber_task_workers(), system_task_concurrency: default_system_task_workers(), - subscriber_task_timeout: default_subscriber_task_timeout(), - system_task_timeout: default_system_task_timeout(), + subscriber_task_reenqueue_orphaned_after: + default_subscriber_task_reenqueue_orphaned_after(), + system_task_reenqueue_orphaned_after: default_system_task_reenqueue_orphaned_after(), + cron_retry_duration: default_cron_retry_duration(), } } } @@ -41,10 +45,14 @@ pub fn default_system_task_workers() -> u32 { } } -pub fn default_subscriber_task_timeout() -> Duration { +pub fn default_subscriber_task_reenqueue_orphaned_after() -> Duration { Duration::from_secs(3600) } -pub fn default_system_task_timeout() -> Duration { +pub fn default_system_task_reenqueue_orphaned_after() -> Duration { Duration::from_secs(3600) } + +pub fn default_cron_retry_duration() -> Duration { + Duration::from_secs(5) +} diff --git a/apps/recorder/src/task/service.rs b/apps/recorder/src/task/service.rs index 1acccf1..09e1f91 100644 --- a/apps/recorder/src/task/service.rs +++ b/apps/recorder/src/task/service.rs @@ -6,13 +6,13 @@ use apalis_sql::{ context::SqlContext, postgres::{PgListen as ApalisPgListen, PostgresStorage as ApalisPostgresStorage}, }; -use sea_orm::{ActiveModelTrait, sqlx::postgres::PgListener}; +use sea_orm::sqlx::postgres::PgListener; use tokio::sync::RwLock; use crate::{ app::AppContextTrait, errors::{RecorderError, RecorderResult}, - models::cron::{self, CRON_DUE_EVENT, CronCreateOptions}, + models::cron::{self, CRON_DUE_EVENT}, task::{ AsyncTaskTrait, SUBSCRIBER_TASK_APALIS_NAME, SYSTEM_TASK_APALIS_NAME, SubscriberTask, TaskConfig, @@ -42,10 +42,10 @@ impl TaskService { }; let pool = ctx.db().get_postgres_connection_pool().clone(); - let subscriber_task_storage_config = - Config::new(SUBSCRIBER_TASK_APALIS_NAME).set_keep_alive(config.subscriber_task_timeout); - let system_task_storage_config = - Config::new(SYSTEM_TASK_APALIS_NAME).set_keep_alive(config.system_task_timeout); + let subscriber_task_storage_config = Config::new(SUBSCRIBER_TASK_APALIS_NAME) + .set_reenqueue_orphaned_after(config.subscriber_task_reenqueue_orphaned_after); + let system_task_storage_config = Config::new(SYSTEM_TASK_APALIS_NAME) + .set_reenqueue_orphaned_after(config.system_task_reenqueue_orphaned_after); let subscriber_task_storage = ApalisPostgresStorage::new_with_config(pool.clone(), subscriber_task_storage_config); let system_task_storage = @@ -121,18 +121,6 @@ impl TaskService { Ok(task_id) } - pub async fn add_subscriber_task_cron( - &self, - subscriber_task: SubscriberTask, - cron_options: CronCreateOptions, - ) -> RecorderResult { - let c = cron::ActiveModel::from_subscriber_task(subscriber_task, cron_options)?; - - let c = c.insert(self.ctx.db()).await?; - - Ok(c) - } - pub async fn add_system_task(&self, system_task: SystemTask) -> RecorderResult { let task_id = { let mut storage = self.system_task_storage.write().await; @@ -182,9 +170,14 @@ impl TaskService { let listener = self.setup_cron_due_listening().await?; let ctx = self.ctx.clone(); let cron_worker_id = self.cron_worker_id.clone(); + let retry_duration = chrono::Duration::milliseconds( + self.config.cron_retry_duration.as_millis() as i64, + ); tokio::task::spawn(async move { - if let Err(e) = Self::listen_cron_due(listener, ctx, &cron_worker_id).await { + if let Err(e) = + Self::listen_cron_due(listener, ctx, &cron_worker_id, retry_duration).await + { tracing::error!("Error listening to cron due: {e}"); } }); @@ -192,13 +185,23 @@ impl TaskService { Ok::<_, RecorderError>(()) }, async { - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); let ctx = self.ctx.clone(); + let retry_duration = chrono::Duration::milliseconds( + self.config.cron_retry_duration.as_millis() as i64, + ); tokio::task::spawn(async move { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); loop { interval.tick().await; - if let Err(e) = cron::Model::cleanup_expired_locks(ctx.as_ref()).await { - tracing::error!("Error cleaning up expired locks: {e}"); + if let Err(e) = cron::Model::check_and_cleanup_expired_cron_locks( + ctx.as_ref(), + retry_duration, + ) + .await + { + tracing::error!( + "Error checking and cleaning up expired cron locks: {e}" + ); } if let Err(e) = cron::Model::check_and_trigger_due_crons(ctx.as_ref()).await { @@ -272,12 +275,19 @@ impl TaskService { mut listener: PgListener, ctx: Arc, worker_id: &str, + retry_duration: chrono::Duration, ) -> RecorderResult<()> { listener.listen(CRON_DUE_EVENT).await?; + loop { let notification = listener.recv().await?; - if let Err(e) = - cron::Model::handle_cron_notification(ctx.as_ref(), notification, worker_id).await + if let Err(e) = cron::Model::handle_cron_notification( + ctx.as_ref(), + notification, + worker_id, + retry_duration, + ) + .await { tracing::error!("Error handling cron notification: {e}"); } diff --git a/apps/webui/src/presentation/routes/_app/subscriptions/detail.$id.tsx b/apps/webui/src/presentation/routes/_app/subscriptions/detail.$id.tsx index b3c289e..5160462 100644 --- a/apps/webui/src/presentation/routes/_app/subscriptions/detail.$id.tsx +++ b/apps/webui/src/presentation/routes/_app/subscriptions/detail.$id.tsx @@ -212,20 +212,6 @@ function SubscriptionDetailRouteComponent() { View subscription detail -
- -
@@ -439,18 +425,32 @@ function SubscriptionDetailRouteComponent() {
- - - - - - +
+ + + + + + + +
{subscription.subscriberTask?.nodes &&