feat: task ui & custom filter mutation

This commit is contained in:
2025-06-16 07:56:52 +08:00
parent 7eb4e41708
commit 421f9d0293
20 changed files with 594 additions and 434 deletions

View File

@@ -6,6 +6,7 @@ use tracing::instrument;
use super::{builder::AppBuilder, context::AppContextTrait};
use crate::{
app::Environment,
errors::{RecorderError, RecorderResult},
web::{
controller::{self, core::ControllerTrait},
@@ -72,7 +73,6 @@ impl App {
.into_make_service_with_connect_info::<SocketAddr>();
let task = context.task();
tokio::try_join!(
async {
axum::serve(listener, router)
@@ -84,15 +84,20 @@ impl App {
Ok::<(), RecorderError>(())
},
async {
let monitor = task.setup_monitor().await?;
monitor
.run_with_signal(async move {
Self::shutdown_signal().await;
tracing::info!("apalis shutting down...");
Ok(())
})
.await?;
{
let monitor = task.setup_monitor().await?;
if matches!(context.environment(), Environment::Development) {
monitor.run().await?;
} else {
monitor
.run_with_signal(async move {
Self::shutdown_signal().await;
tracing::info!("apalis shutting down...");
Ok(())
})
.await?;
}
}
Ok::<(), RecorderError>(())
},

View File

@@ -103,7 +103,7 @@ pub enum RecorderError {
#[snafu(source(from(Box<dyn std::error::Error + Send + Sync>, OptDynErr::some)))]
source: OptDynErr,
},
#[snafu(display("Model Entity {entity} not found"))]
#[snafu(display("Model Entity {entity} not found or not belong to subscriber"))]
ModelEntityNotFound { entity: Cow<'static, str> },
#[snafu(transparent)]
FetchError { source: FetchError },
@@ -123,6 +123,8 @@ pub enum RecorderError {
#[snafu(source(from(Box<dyn std::error::Error + Send + Sync>, OptDynErr::some)))]
source: OptDynErr,
},
#[snafu(display("Invalid task id: {message}"))]
InvalidTaskId { message: String },
}
impl RecorderError {

View File

@@ -1,35 +1,39 @@
use std::{ops::Deref, pin::Pin, sync::Arc};
use std::{ops::Deref, sync::Arc};
use async_graphql::dynamic::{ResolverContext, ValueAccessor};
use async_graphql::dynamic::{FieldValue, TypeRef};
use sea_orm::{
ConnectionTrait, EntityTrait, QueryFilter, QuerySelect, QueryTrait, prelude::Expr,
ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, QuerySelect, QueryTrait, prelude::Expr,
sea_query::Query,
};
use seaography::{Builder as SeaographyBuilder, BuilderContext, get_filter_conditions};
use seaography::{
Builder as SeaographyBuilder, BuilderContext, EntityDeleteMutationBuilder, EntityObjectBuilder,
EntityQueryFieldBuilder, get_filter_conditions,
};
use crate::{
app::AppContextTrait,
errors::{RecorderError, RecorderResult},
errors::RecorderError,
graphql::{
domains::subscribers::restrict_subscriber_for_entity,
infra::{
custom::generate_custom_entity_delete_mutation_field,
custom::generate_entity_filter_mutation_field,
json::{convert_jsonb_output_case_for_entity, restrict_jsonb_filter_input_for_entity},
},
},
models::subscriber_tasks,
task::ApalisJob,
task::{ApalisJobs, ApalisSchema},
};
pub fn register_subscriber_tasks_entity_mutations(builder: &mut SeaographyBuilder) {
pub fn register_subscriber_tasks_entity_mutations(
mut builder: SeaographyBuilder,
) -> SeaographyBuilder {
let context = builder.context;
let delete_mutation = generate_custom_entity_delete_mutation_field::<subscriber_tasks::Entity>(
context,
Arc::new(
|resolver_ctx: &ResolverContext<'_>,
app_ctx: Arc<dyn AppContextTrait>,
filters: Option<ValueAccessor<'_>>|
-> Pin<Box<dyn Future<Output = RecorderResult<Option<i32>>> + Send>> {
{
let entitity_delete_mutation_builder = EntityDeleteMutationBuilder { context };
let delete_mutation = generate_entity_filter_mutation_field::<subscriber_tasks::Entity, _, _>(
context,
entitity_delete_mutation_builder.type_name::<subscriber_tasks::Entity>(),
TypeRef::named_nn(TypeRef::INT),
Arc::new(|resolver_ctx, app_ctx, filters| {
let filters_condition = get_filter_conditions::<subscriber_tasks::Entity>(
resolver_ctx,
context,
@@ -44,23 +48,74 @@ pub fn register_subscriber_tasks_entity_mutations(builder: &mut SeaographyBuilde
.filter(filters_condition);
let delete_query = Query::delete()
.from_table(ApalisJob::Table)
.from_table((ApalisSchema::Schema, ApalisJobs::Table))
.and_where(
Expr::col(ApalisJob::Id).in_subquery(select_subquery.into_query()),
Expr::col(ApalisJobs::Id).in_subquery(select_subquery.into_query()),
)
.to_owned();
let db_backend = db.deref().get_database_backend();
let delete_statement = db_backend.build(&delete_query);
let result = db.execute(delete_statement).await?;
Ok::<Option<i32>, RecorderError>(Some(result.rows_affected() as i32))
Ok::<_, RecorderError>(Some(FieldValue::value(result.rows_affected() as i32)))
})
as Pin<Box<dyn Future<Output = RecorderResult<Option<i32>>> + Send>>
},
),
);
builder.mutations.push(delete_mutation);
}),
);
builder.mutations.push(delete_mutation);
}
{
let entity_object_builder = EntityObjectBuilder { context };
let entity_query_field = EntityQueryFieldBuilder { context };
let entity_retry_one_mutation_name = format!(
"{}RetryOne",
entity_query_field.type_name::<subscriber_tasks::Entity>()
);
let retry_one_mutation =
generate_entity_filter_mutation_field::<subscriber_tasks::Entity, _, _>(
context,
entity_retry_one_mutation_name,
TypeRef::named_nn(entity_object_builder.type_name::<subscriber_tasks::Entity>()),
Arc::new(|resolver_ctx, app_ctx, filters| {
let filters_condition = get_filter_conditions::<subscriber_tasks::Entity>(
resolver_ctx,
context,
filters,
);
Box::pin(async move {
let db = app_ctx.db();
let job_id = subscriber_tasks::Entity::find()
.filter(filters_condition)
.select_only()
.column(subscriber_tasks::Column::Id)
.into_tuple::<String>()
.one(db)
.await?
.ok_or_else(|| RecorderError::ModelEntityNotFound {
entity: "SubscriberTask".into(),
})?;
let task = app_ctx.task();
task.retry_subscriber_task(job_id.clone()).await?;
let task_model = subscriber_tasks::Entity::find()
.filter(subscriber_tasks::Column::Id.eq(&job_id))
.one(db)
.await?
.ok_or_else(|| RecorderError::ModelEntityNotFound {
entity: "SubscriberTask".into(),
})?;
Ok::<_, RecorderError>(Some(FieldValue::owned_any(task_model)))
})
}),
);
builder.mutations.push(retry_one_mutation);
}
builder
}
pub fn register_subscriber_tasks_to_schema_context(context: &mut BuilderContext) {
@@ -89,6 +144,7 @@ pub fn register_subscriber_tasks_to_schema_builder(
builder = builder.register_entity_dataloader_one_to_one(subscriber_tasks::Entity, tokio::spawn);
builder =
builder.register_entity_dataloader_one_to_many(subscriber_tasks::Entity, tokio::spawn);
builder = register_subscriber_tasks_entity_mutations(builder);
builder.register_enumeration::<subscriber_tasks::SubscriberTaskType>();
builder.register_enumeration::<subscriber_tasks::SubscriberTaskStatus>();
builder

View File

@@ -1,104 +1,58 @@
use std::sync::Arc;
use async_graphql::dynamic::{
Field, FieldFuture, FieldValue, InputObject, InputValue, Object, TypeRef,
use async_graphql::dynamic::{FieldValue, TypeRef};
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
use seaography::{
Builder as SeaographyBuilder, EntityObjectBuilder, EntityQueryFieldBuilder,
get_filter_conditions,
};
use seaography::Builder as SeaographyBuilder;
use serde::{Deserialize, Serialize};
use util_derive::DynamicGraphql;
use crate::{
app::AppContextTrait,
auth::AuthUserInfo,
models::subscriptions::{self, SubscriptionTrait},
errors::RecorderError,
graphql::infra::custom::generate_entity_filter_mutation_field,
models::{
subscriber_tasks,
subscriptions::{self, SubscriptionTrait},
},
task::SubscriberTask,
};
#[derive(DynamicGraphql, Serialize, Deserialize, Clone, Debug)]
struct SyncOneSubscriptionFilterInput {
pub id: i32,
}
impl SyncOneSubscriptionFilterInput {
fn input_type_name() -> &'static str {
"SyncOneSubscriptionFilterInput"
}
fn arg_name() -> &'static str {
"filter"
}
fn generate_input_object() -> InputObject {
InputObject::new(Self::input_type_name())
.description("The input of the subscriptionSyncOne series of mutations")
.field(InputValue::new(
SyncOneSubscriptionFilterInputFieldEnum::Id.as_str(),
TypeRef::named_nn(TypeRef::INT),
))
}
}
#[derive(DynamicGraphql, Serialize, Deserialize, Clone, Debug)]
pub struct SyncOneSubscriptionInfo {
pub task_id: String,
}
impl SyncOneSubscriptionInfo {
fn object_type_name() -> &'static str {
"SyncOneSubscriptionInfo"
}
fn generate_output_object() -> Object {
Object::new(Self::object_type_name())
.description("The output of the subscriptionSyncOne series of mutations")
.field(Field::new(
SyncOneSubscriptionInfoFieldEnum::TaskId,
TypeRef::named_nn(TypeRef::STRING),
move |ctx| {
FieldFuture::new(async move {
let subscription_info = ctx.parent_value.try_downcast_ref::<Self>()?;
Ok(Some(async_graphql::Value::from(
subscription_info.task_id.as_str(),
)))
})
},
))
}
}
pub fn register_subscriptions_to_schema_builder(
mut builder: SeaographyBuilder,
) -> SeaographyBuilder {
builder.schema = builder
.schema
.register(SyncOneSubscriptionFilterInput::generate_input_object());
builder.schema = builder
.schema
.register(SyncOneSubscriptionInfo::generate_output_object());
let context = builder.context;
builder.mutations.push(
Field::new(
"subscriptionSyncOneFeedsIncremental",
TypeRef::named_nn(SyncOneSubscriptionInfo::object_type_name()),
move |ctx| {
FieldFuture::new(async move {
let auth_user_info = ctx.data::<AuthUserInfo>()?;
let entity_object_builder = EntityObjectBuilder { context };
let entity_query_field = EntityQueryFieldBuilder { context };
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
let subscriber_id = auth_user_info.subscriber_auth.subscriber_id;
{
let sync_one_feeds_incremental_mutation_name = format!(
"{}SyncOneFeedsIncremental",
entity_query_field.type_name::<subscriptions::Entity>()
);
let filter_input: SyncOneSubscriptionFilterInput = ctx
.args
.get(SyncOneSubscriptionFilterInput::arg_name())
.unwrap()
.deserialize()?;
let sync_one_feeds_incremental_mutation = generate_entity_filter_mutation_field::<
subscriptions::Entity,
_,
_,
>(
builder.context,
sync_one_feeds_incremental_mutation_name,
TypeRef::named_nn(entity_object_builder.type_name::<subscriber_tasks::Entity>()),
Arc::new(|resolver_ctx, app_ctx, filters| {
let filters_condition =
get_filter_conditions::<subscriptions::Entity>(resolver_ctx, context, filters);
let subscription_model = subscriptions::Model::find_by_id_and_subscriber_id(
app_ctx.as_ref(),
filter_input.id,
subscriber_id,
)
.await?;
Box::pin(async move {
let db = app_ctx.db();
let subscription_model = subscriptions::Entity::find()
.filter(filters_condition)
.one(db)
.await?
.ok_or_else(|| RecorderError::ModelEntityNotFound {
entity: "Subscription".into(),
})?;
let subscription =
subscriptions::Subscription::try_from_model(&subscription_model)?;
@@ -107,48 +61,56 @@ pub fn register_subscriptions_to_schema_builder(
let task_id = task_service
.add_subscriber_task(
auth_user_info.subscriber_auth.subscriber_id,
subscription_model.subscriber_id,
SubscriberTask::SyncOneSubscriptionFeedsIncremental(
subscription.into(),
),
)
.await?;
Ok(Some(FieldValue::owned_any(SyncOneSubscriptionInfo {
task_id: task_id.to_string(),
})))
let task_model = subscriber_tasks::Entity::find()
.filter(subscriber_tasks::Column::Id.eq(task_id.to_string()))
.one(db)
.await?
.ok_or_else(|| RecorderError::ModelEntityNotFound {
entity: "SubscriberTask".into(),
})?;
Ok(Some(FieldValue::owned_any(task_model)))
})
},
)
.argument(InputValue::new(
SyncOneSubscriptionFilterInput::arg_name(),
TypeRef::named_nn(SyncOneSubscriptionFilterInput::input_type_name()),
)),
);
}),
);
builder.mutations.push(
Field::new(
"subscriptionSyncOneFeedsFull",
TypeRef::named_nn(SyncOneSubscriptionInfo::object_type_name()),
move |ctx| {
FieldFuture::new(async move {
let auth_user_info = ctx.data::<AuthUserInfo>()?;
builder.mutations.push(sync_one_feeds_incremental_mutation);
}
{
let sync_one_feeds_full_mutation_name = format!(
"{}SyncOneFeedsFull",
entity_query_field.type_name::<subscriptions::Entity>()
);
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
let subscriber_id = auth_user_info.subscriber_auth.subscriber_id;
let sync_one_feeds_full_mutation = generate_entity_filter_mutation_field::<
subscriptions::Entity,
_,
_,
>(
builder.context,
sync_one_feeds_full_mutation_name,
TypeRef::named_nn(entity_object_builder.type_name::<subscriber_tasks::Entity>()),
Arc::new(|resolver_ctx, app_ctx, filters| {
let filters_condition =
get_filter_conditions::<subscriptions::Entity>(resolver_ctx, context, filters);
let filter_input: SyncOneSubscriptionFilterInput = ctx
.args
.get(SyncOneSubscriptionFilterInput::arg_name())
.unwrap()
.deserialize()?;
Box::pin(async move {
let db = app_ctx.db();
let subscription_model = subscriptions::Model::find_by_id_and_subscriber_id(
app_ctx.as_ref(),
filter_input.id,
subscriber_id,
)
.await?;
let subscription_model = subscriptions::Entity::find()
.filter(filters_condition)
.one(db)
.await?
.ok_or_else(|| RecorderError::ModelEntityNotFound {
entity: "Subscription".into(),
})?;
let subscription =
subscriptions::Subscription::try_from_model(&subscription_model)?;
@@ -157,46 +119,55 @@ pub fn register_subscriptions_to_schema_builder(
let task_id = task_service
.add_subscriber_task(
auth_user_info.subscriber_auth.subscriber_id,
subscription_model.subscriber_id,
SubscriberTask::SyncOneSubscriptionFeedsFull(subscription.into()),
)
.await?;
Ok(Some(FieldValue::owned_any(SyncOneSubscriptionInfo {
task_id: task_id.to_string(),
})))
let task_model = subscriber_tasks::Entity::find()
.filter(subscriber_tasks::Column::Id.eq(task_id.to_string()))
.one(db)
.await?
.ok_or_else(|| RecorderError::ModelEntityNotFound {
entity: "SubscriberTask".into(),
})?;
Ok(Some(FieldValue::owned_any(task_model)))
})
},
)
.argument(InputValue::new(
SyncOneSubscriptionFilterInput::arg_name(),
TypeRef::named_nn(SyncOneSubscriptionFilterInput::input_type_name()),
)),
);
}),
);
builder.mutations.push(
Field::new(
"subscriptionSyncOneSources",
TypeRef::named_nn(SyncOneSubscriptionInfo::object_type_name()),
move |ctx| {
FieldFuture::new(async move {
let auth_user_info = ctx.data::<AuthUserInfo>()?;
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
builder.mutations.push(sync_one_feeds_full_mutation);
}
let subscriber_id = auth_user_info.subscriber_auth.subscriber_id;
{
let sync_one_sources_mutation_name = format!(
"{}SyncOneSources",
entity_query_field.type_name::<subscriptions::Entity>()
);
let filter_input: SyncOneSubscriptionFilterInput = ctx
.args
.get(SyncOneSubscriptionFilterInput::arg_name())
.unwrap()
.deserialize()?;
let sync_one_sources_mutation = generate_entity_filter_mutation_field::<
subscriptions::Entity,
_,
_,
>(
builder.context,
sync_one_sources_mutation_name,
TypeRef::named_nn(entity_object_builder.type_name::<subscriber_tasks::Entity>()),
Arc::new(|resolver_ctx, app_ctx, filters| {
let filters_condition =
get_filter_conditions::<subscriptions::Entity>(resolver_ctx, context, filters);
let subscription_model = subscriptions::Model::find_by_id_and_subscriber_id(
app_ctx.as_ref(),
filter_input.id,
subscriber_id,
)
.await?;
Box::pin(async move {
let db = app_ctx.db();
let subscription_model = subscriptions::Entity::find()
.filter(filters_condition)
.one(db)
.await?
.ok_or_else(|| RecorderError::ModelEntityNotFound {
entity: "Subscription".into(),
})?;
let subscription =
subscriptions::Subscription::try_from_model(&subscription_model)?;
@@ -205,22 +176,26 @@ pub fn register_subscriptions_to_schema_builder(
let task_id = task_service
.add_subscriber_task(
auth_user_info.subscriber_auth.subscriber_id,
subscription_model.subscriber_id,
SubscriberTask::SyncOneSubscriptionSources(subscription.into()),
)
.await?;
Ok(Some(FieldValue::owned_any(SyncOneSubscriptionInfo {
task_id: task_id.to_string(),
})))
let task_model = subscriber_tasks::Entity::find()
.filter(subscriber_tasks::Column::Id.eq(task_id.to_string()))
.one(db)
.await?
.ok_or_else(|| RecorderError::ModelEntityNotFound {
entity: "SubscriberTask".into(),
})?;
Ok(Some(FieldValue::owned_any(task_model)))
})
},
)
.argument(InputValue::new(
SyncOneSubscriptionFilterInput::arg_name(),
TypeRef::named_nn(SyncOneSubscriptionFilterInput::input_type_name()),
)),
);
}),
);
builder.mutations.push(sync_one_sources_mutation);
}
builder
}

View File

@@ -1,33 +1,35 @@
use std::{pin::Pin, sync::Arc};
use async_graphql::dynamic::{
Field, FieldFuture, InputValue, ResolverContext, TypeRef, ValueAccessor,
Field, FieldFuture, FieldValue, InputValue, ResolverContext, TypeRef, ValueAccessor,
};
use sea_orm::EntityTrait;
use seaography::{
BuilderContext, EntityDeleteMutationBuilder, EntityObjectBuilder, FilterInputBuilder,
GuardAction,
};
use seaography::{BuilderContext, EntityObjectBuilder, FilterInputBuilder, GuardAction};
use crate::{app::AppContextTrait, errors::RecorderResult};
pub type DeleteMutationFn = Arc<
dyn Fn(
&ResolverContext<'_>,
pub type FilterMutationFn = Arc<
dyn for<'a> Fn(
&ResolverContext<'a>,
Arc<dyn AppContextTrait>,
Option<ValueAccessor<'_>>,
) -> Pin<Box<dyn Future<Output = RecorderResult<Option<i32>>> + Send>>
+ Send
) -> Pin<
Box<dyn Future<Output = RecorderResult<Option<FieldValue<'a>>>> + Send + 'a>,
> + Send
+ Sync,
>;
pub fn generate_custom_entity_delete_mutation_field<T>(
pub fn generate_entity_filter_mutation_field<T, N, R>(
builder_context: &'static BuilderContext,
mutation_fn: DeleteMutationFn,
field_name: N,
type_ref: R,
mutation_fn: FilterMutationFn,
) -> Field
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
N: Into<String>,
R: Into<TypeRef>,
{
let entity_filter_input_builder = FilterInputBuilder {
context: builder_context,
@@ -35,43 +37,38 @@ where
let entity_object_builder = EntityObjectBuilder {
context: builder_context,
};
let entity_delete_mutation_builder = EntityDeleteMutationBuilder {
context: builder_context,
};
let object_name: String = entity_object_builder.type_name::<T>();
let context = builder_context;
let guard = builder_context.guards.entity_guards.get(&object_name);
Field::new(
entity_delete_mutation_builder.type_name::<T>(),
TypeRef::named_nn(TypeRef::INT),
move |ctx| {
let mutation_fn = mutation_fn.clone();
FieldFuture::new(async move {
let guard_flag = if let Some(guard) = guard {
(*guard)(&ctx)
} else {
GuardAction::Allow
};
Field::new(field_name, type_ref, move |ctx| {
let mutation_fn = mutation_fn.clone();
FieldFuture::new(async move {
let guard_flag = if let Some(guard) = guard {
(*guard)(&ctx)
} else {
GuardAction::Allow
};
if let GuardAction::Block(reason) = guard_flag {
return Err::<Option<_>, async_graphql::Error>(async_graphql::Error::new(
reason.unwrap_or("Entity guard triggered.".into()),
));
}
if let GuardAction::Block(reason) = guard_flag {
return Err::<Option<_>, async_graphql::Error>(async_graphql::Error::new(
reason.unwrap_or("Entity guard triggered.".into()),
));
}
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
let filters = ctx.args.get(&context.entity_delete_mutation.filter_field);
let filters = ctx.args.get(&context.entity_delete_mutation.filter_field);
let result = mutation_fn(&ctx, app_ctx.clone(), filters).await?;
let result = mutation_fn(&ctx, app_ctx.clone(), filters)
.await
.map_err(async_graphql::Error::new_with_source)?;
Ok(result.map(async_graphql::Value::from))
})
},
)
Ok(result)
})
})
.argument(InputValue::new(
&context.entity_delete_mutation.filter_field,
TypeRef::named(entity_filter_input_builder.type_name(&object_name)),

View File

@@ -99,7 +99,9 @@ impl Model {
..Default::default()
};
let new_item: Model = new_item.save(&txn).await?.try_into()?;
let new_item: Model = new_item.insert(&txn).await?;
txn.commit().await?;
Ok(new_item)
}

View File

@@ -186,19 +186,13 @@ impl Model {
let subscription_model = Entity::find_by_id(subscription_id)
.one(db)
.await?
.ok_or_else(|| RecorderError::DbError {
source: DbErr::RecordNotFound(format!(
"Subscription id {subscription_id} not found or not belong to subscriber \
{subscriber_id}",
)),
.ok_or_else(|| RecorderError::ModelEntityNotFound {
entity: "Subscription".into(),
})?;
if subscription_model.subscriber_id != subscriber_id {
Err(RecorderError::DbError {
source: DbErr::RecordNotFound(format!(
"Subscription id {subscription_id} not found or not belong to subscriber \
{subscriber_id}",
)),
Err(RecorderError::ModelEntityNotFound {
entity: "Subscription".into(),
})?;
}

View File

@@ -1,7 +0,0 @@
use sea_orm::sea_query;
#[derive(sea_query::Iden)]
pub enum ApalisJob {
Table,
Id,
}

View File

@@ -0,0 +1,16 @@
use sea_orm::sea_query;
#[derive(sea_query::Iden)]
pub enum ApalisSchema {
#[iden = "apalis"]
Schema,
}
#[derive(sea_query::Iden)]
pub enum ApalisJobs {
#[iden = "jobs"]
Table,
Id,
}

View File

@@ -1,13 +1,13 @@
mod config;
mod core;
mod db;
mod r#extern;
mod registry;
mod service;
pub use core::{SUBSCRIBER_TASK_APALIS_NAME, SubscriberAsyncTaskTrait, SubscriberStreamTaskTrait};
pub use config::TaskConfig;
pub use db::ApalisJob;
pub use r#extern::{ApalisJobs, ApalisSchema};
pub use registry::{
SubscriberTask, SubscriberTaskType, SubscriberTaskTypeEnum, SubscriberTaskTypeVariant,
SubscriberTaskTypeVariantIter, SyncOneSubscriptionFeedsFullTask,

View File

@@ -1,4 +1,4 @@
use std::{ops::Deref, sync::Arc};
use std::{ops::Deref, str::FromStr, sync::Arc};
use apalis::prelude::*;
use apalis_sql::{
@@ -10,7 +10,7 @@ use tokio::sync::RwLock;
use crate::{
app::AppContextTrait,
errors::RecorderResult,
errors::{RecorderError, RecorderResult},
task::{SUBSCRIBER_TASK_APALIS_NAME, SubscriberTask, TaskConfig},
};
@@ -45,6 +45,19 @@ impl TaskService {
job.run(ctx).await
}
pub async fn retry_subscriber_task(&self, job_id: String) -> RecorderResult<()> {
{
let mut storage = self.subscriber_task_storage.write().await;
let task_id =
TaskId::from_str(&job_id).map_err(|err| RecorderError::InvalidTaskId {
message: err.to_string(),
})?;
let worker_id = WorkerId::new(SUBSCRIBER_TASK_APALIS_NAME);
storage.retry(&worker_id, &task_id).await?;
}
Ok(())
}
pub async fn add_subscriber_task(
&self,
_subscriber_id: i32,
@@ -65,15 +78,23 @@ impl TaskService {
}
pub async fn setup_monitor(&self) -> RecorderResult<Monitor> {
let monitor = Monitor::new();
let worker = WorkerBuilder::new(SUBSCRIBER_TASK_APALIS_NAME)
.catch_panic()
.enable_tracing()
.data(self.ctx.clone())
.backend(self.subscriber_task_storage.read().await.clone())
.build_fn(Self::run_subscriber_task);
let mut monitor = Monitor::new();
Ok(monitor.register(worker))
{
let subscriber_task_worker = WorkerBuilder::new(SUBSCRIBER_TASK_APALIS_NAME)
.catch_panic()
.enable_tracing()
.data(self.ctx.clone())
.backend({
let storage = self.subscriber_task_storage.read().await;
storage.clone()
})
.build_fn(Self::run_subscriber_task);
monitor = monitor.register(subscriber_task_worker);
}
Ok(monitor)
}
pub async fn setup_listener(&self) -> RecorderResult<PgListen> {