feat: refactor tasks

This commit is contained in:
master 2025-05-20 01:23:13 +08:00
parent b772937354
commit b2f327d48f
23 changed files with 393 additions and 151 deletions

13
.vscode/settings.json vendored
View File

@ -29,7 +29,8 @@
"prettier.enable": false,
"typescript.tsdk": "node_modules/typescript/lib",
"rust-analyzer.cargo.features": [
"testcontainers"
"testcontainers",
"playground"
],
"sqltools.connections": [
{
@ -40,6 +41,16 @@
"name": "konobangu-dev",
"database": "konobangu",
"username": "konobangu"
},
{
"previewLimit": 50,
"server": "localhost",
"port": 32770,
"askForPassword": true,
"driver": "PostgreSQL",
"name": "docker-pgsql",
"database": "konobangu",
"username": "konobangu"
}
]
}

View File

@ -15,6 +15,7 @@ required-features = []
[features]
default = []
playground = ["dep:mockito"]
testcontainers = [
"dep:testcontainers",
"dep:testcontainers-modules",
@ -110,17 +111,17 @@ apalis = { version = "0.7", features = ["limit", "tracing", "catch-panic"] }
apalis-sql = { version = "0.7", features = ["postgres"] }
cocoon = { version = "0.4.3", features = ["getrandom", "thiserror"] }
rand = "0.9.1"
rust_decimal = "1.37.1"
reqwest_cookie_store = "0.8.0"
mockito = { version = "1.6.1", optional = true }
downloader = { workspace = true }
util = { workspace = true }
fetch = { workspace = true }
nanoid = "0.4.0"
rust_decimal = "1.37.1"
[dev-dependencies]
serial_test = "3"
insta = { version = "1", features = ["redactions", "yaml", "filters"] }
mockito = "1.6.1"
rstest = "0.25"
ctor = "0.4.0"

View File

@ -1,56 +1,33 @@
use recorder::errors::RecorderResult;
// #![allow(unused_imports)]
// use recorder::{
// app::{AppContext, AppContextTrait},
// errors::RecorderResult,
// migrations::Migrator,
// models::{
// subscribers::SEED_SUBSCRIBER,
// subscriptions::{self, SubscriptionCreateFromRssDto},
// },
// };
// use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
// use sea_orm_migration::MigratorTrait;
#![feature(duration_constructors_lite)]
use std::{sync::Arc, time::Duration};
// async fn pull_mikan_bangumi_rss(ctx: &dyn AppContextTrait) -> RecorderResult<()> {
// let rss_link = "https://mikanani.me/RSS/Bangumi?bangumiId=3416&subgroupid=370";
// // let rss_link =
// // "https://mikanani.me/RSS/MyBangumi?token=FE9tccsML2nBPUUqpCuJW2uJZydAXCntHJ7RpD9LDP8%3d";
// let subscription = if let Some(subscription) =
// subscriptions::Entity::find()
// .filter(subscriptions::Column::SourceUrl.eq(String::from(rss_link)))
// .one(ctx.db())
// .await?
// {
// subscription
// } else {
// subscriptions::Model::add_subscription(
// ctx,
//
// subscriptions::SubscriptionCreateDto::Mikan(SubscriptionCreateFromRssDto {
// rss_link: rss_link.to_string(),
// display_name: String::from("Mikan Project - 我的番组"),
// enabled: Some(true),
// }),
// 1,
// )
// .await?
// };
// subscription.pull_subscription(ctx).await?;
// Ok(())
// }
// #[tokio::main]
// async fn main() -> RecorderResult<()> {
// pull_mikan_bangumi_rss(&ctx).await?;
// Ok(())
// }
use apalis_sql::postgres::PostgresStorage;
use recorder::{
app::AppContextTrait,
errors::RecorderResult,
test_utils::{
app::TestingAppContext,
database::{TestingDatabaseServiceConfig, build_testing_database_service},
},
};
#[tokio::main]
async fn main() -> RecorderResult<()> {
let app_ctx = {
let db_service = build_testing_database_service(TestingDatabaseServiceConfig {
auto_migrate: false,
})
.await?;
Arc::new(TestingAppContext::builder().db(db_service).build())
};
let db = app_ctx.db();
PostgresStorage::setup(db.get_postgres_connection_pool()).await?;
dbg!(db.get_postgres_connection_pool().connect_options());
tokio::time::sleep(Duration::from_hours(1)).await;
Ok(())
}

View File

@ -16,7 +16,7 @@ pub trait DatabaseServiceConnectionTrait {
pub struct DatabaseService {
connection: DatabaseConnection,
#[cfg(all(test, feature = "testcontainers"))]
#[cfg(all(any(test, feature = "playground"), feature = "testcontainers"))]
pub container:
Option<testcontainers::ContainerAsync<testcontainers_modules::postgres::Postgres>>,
}
@ -54,7 +54,7 @@ impl DatabaseService {
let me = Self {
connection: db,
#[cfg(all(test, feature = "testcontainers"))]
#[cfg(all(any(test, feature = "playground"), feature = "testcontainers"))]
container: None,
};
@ -66,18 +66,19 @@ impl DatabaseService {
}
pub async fn migrate_up(&self) -> RecorderResult<()> {
Migrator::up(&self.connection, None).await?;
{
let pool = &self.get_postgres_connection_pool();
PostgresStorage::setup(pool).await?;
}
Migrator::up(&self.connection, None).await?;
Ok(())
}
pub async fn migrate_down(&self) -> RecorderResult<()> {
Migrator::down(&self.connection, None).await?;
{
let _pool = &self.get_postgres_connection_pool();
self.execute_unprepared(r#"DROP SCHEMA IF EXISTS apalis CASCADE"#)
.await?;
}
Ok(())
}

View File

@ -76,7 +76,7 @@ pub enum RecorderError {
},
#[snafu(transparent)]
HttpClientError { source: HttpClientError },
#[cfg(all(feature = "testcontainers", test))]
#[cfg(all(any(test, feature = "playground"), feature = "testcontainers"))]
#[snafu(transparent)]
TestcontainersError {
source: testcontainers::TestcontainersError,

View File

@ -253,7 +253,7 @@ mod tests {
use super::*;
use crate::test_utils::{
app::UnitTestAppContext,
app::TestingAppContext,
crypto::build_testing_crypto_service,
database::build_testing_database_service,
mikan::{MikanMockServer, build_testing_mikan_client, build_testing_mikan_credential_form},
@ -264,9 +264,9 @@ mod tests {
mikan_base_url: Url,
) -> RecorderResult<Arc<dyn AppContextTrait>> {
let mikan_client = build_testing_mikan_client(mikan_base_url.clone()).await?;
let db_service = build_testing_database_service().await?;
let db_service = build_testing_database_service(Default::default()).await?;
let crypto_service = build_testing_crypto_service().await?;
let ctx = UnitTestAppContext::builder()
let ctx = TestingAppContext::builder()
.db(db_service)
.crypto(crypto_service)
.mikan(mikan_client)

View File

@ -967,7 +967,7 @@ mod test {
use crate::{
extract::mikan::{MIKAN_BANGUMI_EXPAND_SUBSCRIBED_PAGE_PATH, MIKAN_SEASON_FLOW_PAGE_PATH},
test_utils::{
app::UnitTestAppContext,
app::TestingAppContext,
crypto::build_testing_crypto_service,
database::build_testing_database_service,
mikan::{
@ -1195,9 +1195,9 @@ mod test {
let app_ctx = {
let mikan_client = build_testing_mikan_client(mikan_base_url.clone()).await?;
let db_service = build_testing_database_service().await?;
let db_service = build_testing_database_service(Default::default()).await?;
let crypto_service = build_testing_crypto_service().await?;
let app_ctx = UnitTestAppContext::builder()
let app_ctx = TestingAppContext::builder()
.mikan(mikan_client)
.db(db_service)
.crypto(crypto_service)

View File

@ -1,12 +1,17 @@
use async_graphql::dynamic::SchemaError;
use async_graphql::{
Error as GraphqlError, InputValueResult, Scalar, ScalarType, dynamic::SchemaError, to_value,
};
use itertools::Itertools;
use once_cell::sync::OnceCell;
use rust_decimal::{Decimal, prelude::FromPrimitive};
use sea_orm::{
Condition,
Condition, EntityTrait,
sea_query::{ArrayType, Expr, ExprTrait, IntoLikeExpr, SimpleExpr, Value as DbValue},
};
use seaography::{BuilderContext, FilterInfo, SeaographyError};
use serde_json::Value as JsonValue;
use super::subscriber::FnFilterCondition;
use crate::errors::RecorderResult;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Copy)]
@ -317,7 +322,7 @@ fn json_path_type_assert_expr(
typestr: &str,
) -> SimpleExpr {
Expr::cust_with_exprs(
format!("jsonb_path_exists($1, $2 || ' ? (@.type = \"{typestr}\")')"),
format!("jsonb_path_exists($1, $2 || ' ? (@.type() = \"{typestr}\")')"),
[col_expr.into(), json_path_expr(path)],
)
}
@ -767,8 +772,7 @@ where
.map(|(i, v)| (JsonIndex::Num(i as u64), v))
.collect(),
_ => Err(SchemaError(format!(
"Json filter input node must be an object or array, but got {}",
node.to_string()
"Json filter input node must be an object or array, but got {node}"
)))?,
};
let mut conditions = Condition::all();
@ -866,6 +870,46 @@ where
Ok(condition)
}
#[derive(Clone, Debug)]
pub struct JsonFilterInput(pub serde_json::Value);
#[Scalar(name = "JsonFilterInput")]
impl ScalarType for JsonFilterInput {
fn parse(value: async_graphql::Value) -> InputValueResult<Self> {
Ok(JsonFilterInput(value.into_json()?))
}
fn to_value(&self) -> async_graphql::Value {
async_graphql::Value::from_json(self.0.clone()).unwrap()
}
}
pub static JSONB_FILTER_INFO: OnceCell<FilterInfo> = OnceCell::new();
pub fn jsonb_filter_condition_function<T>(
_context: &BuilderContext,
column: &T::Column,
) -> FnFilterCondition
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let column = *column;
Box::new(move |mut condition, filter| {
let filter_value = to_value(filter.as_index_map())
.map_err(|e| SeaographyError::AsyncGraphQLError(GraphqlError::new_with_source(e)))?;
let filter = JsonFilterInput::parse(filter_value)
.map_err(|e| SeaographyError::AsyncGraphQLError(GraphqlError::new(format!("{e:?}"))))?;
let cond_where = prepare_json_filter_input(&Expr::col(column), filter.0)
.map_err(|e| SeaographyError::AsyncGraphQLError(GraphqlError::new_with_source(e)))?;
condition = condition.add(cond_where);
Ok(condition)
})
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
@ -965,7 +1009,7 @@ mod tests {
assert_eq!(
sql,
"SELECT \"job\" FROM \"test_table\" WHERE jsonb_path_exists(\"test_table\".\"job\", \
$1 || ' ? (@.type = \"string\")')"
$1 || ' ? (@.type() = \"string\")')"
);
}
@ -981,7 +1025,7 @@ mod tests {
assert_eq!(
sql,
"SELECT \"job\" FROM \"test_table\" WHERE (CASE WHEN \
((jsonb_path_exists(\"test_table\".\"job\", $1 || ' ? (@.type = \"array\")')) \
((jsonb_path_exists(\"test_table\".\"job\", $1 || ' ? (@.type() = \"array\")')) \
AND (jsonb_path_query_first(\"test_table\".\"job\", $2) @> $3)) THEN true ELSE \
false END) = (true)"
);
@ -1000,9 +1044,9 @@ mod tests {
assert_eq!(
sql,
"SELECT \"job\" FROM \"test_table\" WHERE (CASE WHEN \
((jsonb_path_exists(\"test_table\".\"job\", $1 || ' ? (@.type = \"array\")')) \
((jsonb_path_exists(\"test_table\".\"job\", $1 || ' ? (@.type() = \"array\")')) \
AND (jsonb_path_query_first(\"test_table\".\"job\", $2) @> $3)) THEN true WHEN \
((jsonb_path_exists(\"test_table\".\"job\", $4 || ' ? (@.type = \"string\")')) \
((jsonb_path_exists(\"test_table\".\"job\", $4 || ' ? (@.type() = \"string\")')) \
AND CAST((jsonb_path_query_first(\"test_table\".\"job\", $5)) AS text) LIKE $6) \
THEN true ELSE false END) = (true)"
);
@ -1028,7 +1072,7 @@ mod tests {
assert_eq!(
sql,
"SELECT \"job\" FROM \"test_table\" WHERE \
(jsonb_path_exists(\"test_table\".\"job\", $1 || ' ? (@.type = \"number\")')) \
(jsonb_path_exists(\"test_table\".\"job\", $1 || ' ? (@.type() = \"number\")')) \
AND (CAST((jsonb_path_query_first(\"test_table\".\"job\", $2)) AS numeric) \
BETWEEN $3 AND $4)"
);
@ -1048,7 +1092,7 @@ mod tests {
assert_eq!(
sql,
"SELECT \"job\" FROM \"test_table\" WHERE \
(jsonb_path_exists(\"test_table\".\"job\", $1 || ' ? (@.type = \"string\")')) \
(jsonb_path_exists(\"test_table\".\"job\", $1 || ' ? (@.type() = \"string\")')) \
AND (CAST((jsonb_path_query_first(\"test_table\".\"job\", $2)) AS text) BETWEEN \
$3 AND $4)"
);
@ -1068,7 +1112,7 @@ mod tests {
assert_eq!(
sql,
"SELECT \"job\" FROM \"test_table\" WHERE \
(jsonb_path_exists(\"test_table\".\"job\", $1 || ' ? (@.type = \"boolean\")')) \
(jsonb_path_exists(\"test_table\".\"job\", $1 || ' ? (@.type() = \"boolean\")')) \
AND (CAST((jsonb_path_query_first(\"test_table\".\"job\", $2)) AS boolean) \
BETWEEN $3 AND $4)"
);
@ -1090,7 +1134,7 @@ mod tests {
assert_eq!(
sql,
"SELECT \"job\" FROM \"test_table\" WHERE (jsonb_path_exists(\"test_table\".\"job\", \
$1 || ' ? (@.type = \"string\")')) AND \
$1 || ' ? (@.type() = \"string\")')) AND \
CAST((jsonb_path_query_first(\"test_table\".\"job\", $2)) AS text) LIKE $3"
);
assert_eq!(params.len(), 3);
@ -1109,7 +1153,7 @@ mod tests {
assert_eq!(
sql,
"SELECT \"job\" FROM \"test_table\" WHERE (jsonb_path_exists(\"test_table\".\"job\", \
$1 || ' ? (@.type = \"string\")')) AND \
$1 || ' ? (@.type() = \"string\")')) AND \
(starts_with(CAST((jsonb_path_query_first(\"test_table\".\"job\", $2)) AS text), $3))"
);
assert_eq!(params.len(), 3);
@ -1128,7 +1172,7 @@ mod tests {
assert_eq!(
sql,
"SELECT \"job\" FROM \"test_table\" WHERE (jsonb_path_exists(\"test_table\".\"job\", \
$1 || ' ? (@.type = \"string\")')) AND \
$1 || ' ? (@.type() = \"string\")')) AND \
CAST((jsonb_path_query_first(\"test_table\".\"job\", $2)) AS text) LIKE $3"
);
assert_eq!(params.len(), 3);

View File

@ -1,18 +1,13 @@
mod json;
mod subscriber;
use async_graphql::{
InputValueResult, Scalar, ScalarType,
dynamic::{ObjectAccessor, TypeRef},
};
pub use json::prepare_json_filter_input;
use std::borrow::Cow;
use async_graphql::dynamic::TypeRef;
pub use json::{JSONB_FILTER_INFO, jsonb_filter_condition_function};
use maplit::btreeset;
use once_cell::sync::OnceCell;
use sea_orm::{ColumnTrait, Condition, EntityTrait};
use seaography::{
BuilderContext, FilterInfo, FilterOperation as SeaographqlFilterOperation, SeaResult,
};
pub static SUBSCRIBER_ID_FILTER_INFO: OnceCell<FilterInfo> = OnceCell::new();
use seaography::{FilterInfo, FilterOperation as SeaographqlFilterOperation};
pub use subscriber::{SUBSCRIBER_ID_FILTER_INFO, subscriber_id_condition_function};
pub fn init_custom_filter_info() {
SUBSCRIBER_ID_FILTER_INFO.get_or_init(|| FilterInfo {
@ -20,49 +15,9 @@ pub fn init_custom_filter_info() {
base_type: TypeRef::INT.into(),
supported_operations: btreeset! { SeaographqlFilterOperation::Equals },
});
}
pub type FnFilterCondition =
Box<dyn Fn(Condition, &ObjectAccessor) -> SeaResult<Condition> + Send + Sync>;
pub fn subscriber_id_condition_function<T>(
_context: &BuilderContext,
column: &T::Column,
) -> FnFilterCondition
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let column = *column;
Box::new(move |mut condition, filter| {
let subscriber_id_filter_info = SUBSCRIBER_ID_FILTER_INFO.get().unwrap();
let operations = &subscriber_id_filter_info.supported_operations;
for operation in operations {
match operation {
SeaographqlFilterOperation::Equals => {
if let Some(value) = filter.get("eq") {
let value: i32 = value.i64()?.try_into()?;
let value = sea_orm::Value::Int(Some(value));
condition = condition.add(column.eq(value));
}
}
_ => unreachable!("unreachable filter operation for subscriber_id"),
}
}
Ok(condition)
})
}
#[derive(Clone, Debug)]
pub struct JsonFilterInput(pub serde_json::Value);
#[Scalar(name = "JsonFilterInput")]
impl ScalarType for JsonFilterInput {
fn parse(value: async_graphql::Value) -> InputValueResult<Self> {
Ok(JsonFilterInput(value.into_json()?))
}
fn to_value(&self) -> async_graphql::Value {
async_graphql::Value::from_json(self.0.clone()).unwrap()
}
JSONB_FILTER_INFO.get_or_init(|| FilterInfo {
type_name: String::from("JsonbFilterInput"),
base_type: TypeRef::Named(Cow::Borrowed("serde_json::Value")).to_string(),
supported_operations: btreeset! { SeaographqlFilterOperation::Equals },
});
}

View File

@ -0,0 +1,39 @@
use async_graphql::dynamic::ObjectAccessor;
use once_cell::sync::OnceCell;
use sea_orm::{ColumnTrait, Condition, EntityTrait};
use seaography::{
BuilderContext, FilterInfo, FilterOperation as SeaographqlFilterOperation, SeaResult,
};
pub static SUBSCRIBER_ID_FILTER_INFO: OnceCell<FilterInfo> = OnceCell::new();
pub type FnFilterCondition =
Box<dyn Fn(Condition, &ObjectAccessor) -> SeaResult<Condition> + Send + Sync>;
pub fn subscriber_id_condition_function<T>(
_context: &BuilderContext,
column: &T::Column,
) -> FnFilterCondition
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let column = *column;
Box::new(move |mut condition, filter| {
let subscriber_id_filter_info = SUBSCRIBER_ID_FILTER_INFO.get().unwrap();
let operations = &subscriber_id_filter_info.supported_operations;
for operation in operations {
match operation {
SeaographqlFilterOperation::Equals => {
if let Some(value) = filter.get("eq") {
let value: i32 = value.i64()?.try_into()?;
let value = sea_orm::Value::Int(Some(value));
condition = condition.add(column.eq(value));
}
}
_ => unreachable!("unreachable filter operation for subscriber_id"),
}
}
Ok(condition)
})
}

View File

@ -5,7 +5,8 @@ use seaography::{Builder, BuilderContext, FilterType, FilterTypesMapHelper};
use crate::graphql::infra::{
filter::{
SUBSCRIBER_ID_FILTER_INFO, init_custom_filter_info, subscriber_id_condition_function,
JSONB_FILTER_INFO, SUBSCRIBER_ID_FILTER_INFO, init_custom_filter_info,
subscriber_id_condition_function,
},
guard::{guard_entity_with_subscriber_id, guard_field_with_subscriber_id},
transformer::{filter_condition_transformer, mutation_input_object_transformer},
@ -26,6 +27,20 @@ fn restrict_filter_input_for_entity<T>(
context.filter_types.overwrites.insert(key, filter_type);
}
fn restrict_jsonb_filter_input_for_entity<T>(context: &mut BuilderContext, column: &T::Column)
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let entity_column_key = get_entity_column_key::<T>(context, column);
context.filter_types.overwrites.insert(
entity_column_key.clone(),
Some(FilterType::Custom(
JSONB_FILTER_INFO.get().unwrap().type_name.clone(),
)),
);
}
fn restrict_subscriber_for_entity<T>(context: &mut BuilderContext, column: &T::Column)
where
T: EntityTrait,
@ -118,6 +133,14 @@ pub fn schema(
&mut context,
&subscription_episode::Column::SubscriberId,
);
restrict_subscriber_for_entity::<subscriber_tasks::Entity>(
&mut context,
&subscriber_tasks::Column::SubscriberId,
);
restrict_jsonb_filter_input_for_entity::<subscriber_tasks::Entity>(
&mut context,
&subscriber_tasks::Column::Job,
);
for column in subscribers::Column::iter() {
if !matches!(column, subscribers::Column::Id) {
restrict_filter_input_for_entity::<subscribers::Entity>(
@ -159,6 +182,7 @@ pub fn schema(
subscription_bangumi,
subscription_episode,
subscriptions,
subscriber_tasks,
]
);

View File

@ -9,6 +9,7 @@
associated_type_defaults,
let_chains
)]
#![allow(clippy::enum_variant_names)]
pub use downloader;
pub mod app;
@ -25,6 +26,6 @@ pub mod migrations;
pub mod models;
pub mod storage;
pub mod task;
#[cfg(test)]
#[cfg(any(test, feature = "playground"))]
pub mod test_utils;
pub mod web;

View File

@ -181,6 +181,17 @@ impl MigrationTrait for Migration {
.on_update(ForeignKeyAction::Cascade)
.on_delete(ForeignKeyAction::Cascade),
)
.foreign_key(
ForeignKey::create()
.name("fk_subscription_bangumi_subscriber_id")
.from(
SubscriptionBangumi::Table,
SubscriptionBangumi::SubscriberId,
)
.to(Subscribers::Table, Subscribers::Id)
.on_update(ForeignKeyAction::Cascade)
.on_delete(ForeignKeyAction::Cascade),
)
.index(
Index::create()
.if_not_exists()
@ -299,6 +310,17 @@ impl MigrationTrait for Migration {
.on_update(ForeignKeyAction::Cascade)
.on_delete(ForeignKeyAction::Cascade),
)
.foreign_key(
ForeignKey::create()
.name("fk_subscription_episode_subscriber_id")
.from(
SubscriptionEpisode::Table,
SubscriptionEpisode::SubscriberId,
)
.to(Subscribers::Table, Subscribers::Id)
.on_update(ForeignKeyAction::Cascade)
.on_delete(ForeignKeyAction::Cascade),
)
.index(
Index::create()
.if_not_exists()

View File

@ -0,0 +1,64 @@
use async_trait::async_trait;
use sea_orm_migration::prelude::*;
use crate::task::SUBSCRIBER_TASK_APALIS_NAME;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let db = manager.get_connection();
db.execute_unprepared(&format!(
r#"CREATE VIEW IF NOT EXISTS subscriber_task AS
SELECT
job,
task_type,
status,
(job->'subscriber_id')::integer AS subscriber_id,
(job->'task_type')::text AS task_type,
id,
attempts,
max_attempts,
run_at,
last_error,
lock_at,
lock_by,
done_at,
priority
FROM apalis.jobs
WHERE job_type = {SUBSCRIBER_TASK_APALIS_NAME}
AND jsonb_path_exists(job, '$.subscriber_id ? (@.type() == "number")')
AND jsonb_path_exists(job, '$.task_type ? (@.type() == "string")')"#,
))
.await?;
db.execute_unprepared(&format!(
r#"CREATE INDEX IF NOT EXISTS idx_apalis_jobs_subscriber_id
ON apalis.jobs ((job -> 'subscriber_id'))
WHERE job_type = {SUBSCRIBER_TASK_APALIS_NAME}
AND jsonb_path_exists(job, '$.subscriber_id ? (@.type() == "number")')
AND jsonb_path_exists(job, '$.task_type ? (@.type() == "string")')"#
))
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let db = manager.get_connection();
db.execute_unprepared(
r#"DROP INDEX IF EXISTS idx_apalis_jobs_subscriber_id
ON apalis.jobs"#,
)
.await?;
db.execute_unprepared("DROP VIEW IF EXISTS subscriber_task")
.await?;
Ok(())
}
}

View File

@ -7,6 +7,7 @@ pub mod m20220101_000001_init;
pub mod m20240224_082543_add_downloads;
pub mod m20241231_000001_auth;
pub mod m20250501_021523_credential_3rd;
pub mod m20250520_021135_subscriber_tasks;
pub struct Migrator;
@ -18,6 +19,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240224_082543_add_downloads::Migration),
Box::new(m20241231_000001_auth::Migration),
Box::new(m20250501_021523_credential_3rd::Migration),
Box::new(m20250520_021135_subscriber_tasks::Migration),
]
}
}

View File

@ -6,13 +6,42 @@ use crate::task::SubscriberTask;
#[sea_orm(table_name = "subscriber_tasks")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub id: String,
pub subscriber_id: i32,
pub job: SubscriberTask,
pub state: String,
pub status: String,
pub attempts: i32,
pub max_attempts: i32,
pub run_at: DateTimeUtc,
pub last_error: Option<String>,
pub lock_at: Option<DateTimeUtc>,
pub lock_by: Option<String>,
pub done_at: Option<DateTimeUtc>,
pub priority: i32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
pub enum Relation {
#[sea_orm(
belongs_to = "super::subscribers::Entity",
from = "Column::SubscriberId",
to = "super::subscribers::Column::Id",
on_update = "Cascade",
on_delete = "Cascade"
)]
Subscriber,
}
impl Related<super::subscribers::Entity> for Entity {
fn to() -> RelationDef {
Relation::Subscriber.def()
}
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)]
pub enum RelatedEntity {
#[sea_orm(entity = "super::subscribers::Entity")]
Subscriber,
}
impl ActiveModelBehavior for ActiveModel {}

View File

@ -32,6 +32,14 @@ pub enum Relation {
on_delete = "Cascade"
)]
Bangumi,
#[sea_orm(
belongs_to = "super::subscribers::Entity",
from = "Column::SubscriberId",
to = "super::subscribers::Column::Id",
on_update = "Cascade",
on_delete = "Cascade"
)]
Subscriber,
}
impl Related<super::subscriptions::Entity> for Entity {
@ -46,12 +54,20 @@ impl Related<super::bangumi::Entity> for Entity {
}
}
impl Related<super::subscribers::Entity> for Entity {
fn to() -> RelationDef {
Relation::Subscriber.def()
}
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)]
pub enum RelatedEntity {
#[sea_orm(entity = "super::subscriptions::Entity")]
Subscription,
#[sea_orm(entity = "super::bangumi::Entity")]
Bangumi,
#[sea_orm(entity = "super::subscribers::Entity")]
Subscriber,
}
#[async_trait]

View File

@ -32,6 +32,14 @@ pub enum Relation {
on_delete = "Cascade"
)]
Episode,
#[sea_orm(
belongs_to = "super::subscribers::Entity",
from = "Column::SubscriberId",
to = "super::subscribers::Column::Id",
on_update = "Cascade",
on_delete = "Cascade"
)]
Subscriber,
}
impl Related<super::subscriptions::Entity> for Entity {
@ -46,12 +54,20 @@ impl Related<super::episodes::Entity> for Entity {
}
}
impl Related<super::subscribers::Entity> for Entity {
fn to() -> RelationDef {
Relation::Subscriber.def()
}
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)]
pub enum RelatedEntity {
#[sea_orm(entity = "super::subscriptions::Entity")]
Subscription,
#[sea_orm(entity = "super::episodes::Entity")]
Episode,
#[sea_orm(entity = "super::subscribers::Entity")]
Subscriber,
}
#[async_trait]

View File

@ -1,7 +1,7 @@
use std::{ops::Deref, sync::Arc};
use apalis::prelude::*;
use apalis_sql::postgres::PostgresStorage;
use apalis_sql::{Config, postgres::PostgresStorage};
use tokio::sync::RwLock;
use crate::{
@ -22,7 +22,11 @@ impl TaskService {
ctx: Arc<dyn AppContextTrait>,
) -> RecorderResult<Self> {
let pool = ctx.db().get_postgres_connection_pool().clone();
let subscriber_task_storage = Arc::new(RwLock::new(PostgresStorage::new(pool)));
let storage_config = Config::new(SUBSCRIBER_TASK_APALIS_NAME);
let subscriber_task_storage = Arc::new(RwLock::new(PostgresStorage::new_with_config(
pool,
storage_config,
)));
Ok(Self {
config,

View File

@ -1,12 +1,13 @@
use std::fmt::Debug;
use std::{fmt::Debug, sync::Arc};
use once_cell::sync::OnceCell;
use typed_builder::TypedBuilder;
use crate::app::AppContextTrait;
#[derive(TypedBuilder)]
#[builder(field_defaults(default, setter(strip_option)))]
pub struct UnitTestAppContext {
pub struct TestingAppContext {
logger: Option<crate::logger::LoggerService>,
db: Option<crate::database::DatabaseService>,
config: Option<crate::app::AppConfig>,
@ -16,7 +17,8 @@ pub struct UnitTestAppContext {
graphql: Option<crate::graphql::GraphQLService>,
storage: Option<crate::storage::StorageService>,
crypto: Option<crate::crypto::CryptoService>,
task: Option<crate::task::TaskService>,
#[builder(default = Arc::new(OnceCell::new()), setter(!strip_option))]
task: Arc<OnceCell<crate::task::TaskService>>,
message: Option<crate::message::MessageService>,
#[builder(default = Some(String::from(env!("CARGO_MANIFEST_DIR"))))]
working_dir: Option<String>,
@ -24,13 +26,19 @@ pub struct UnitTestAppContext {
environment: crate::app::Environment,
}
impl Debug for UnitTestAppContext {
impl TestingAppContext {
pub fn set_task(&self, task: crate::task::TaskService) {
self.task.get_or_init(|| task);
}
}
impl Debug for TestingAppContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "UnitTestAppContext")
}
}
impl AppContextTrait for UnitTestAppContext {
impl AppContextTrait for TestingAppContext {
fn logger(&self) -> &crate::logger::LoggerService {
self.logger.as_ref().expect("should set logger")
}
@ -76,7 +84,7 @@ impl AppContextTrait for UnitTestAppContext {
}
fn task(&self) -> &crate::task::TaskService {
self.task.as_ref().expect("should set tasks")
self.task.get().expect("should set task")
}
fn message(&self) -> &crate::message::MessageService {

View File

@ -3,8 +3,20 @@ use crate::{
errors::RecorderResult,
};
pub struct TestingDatabaseServiceConfig {
pub auto_migrate: bool,
}
impl Default for TestingDatabaseServiceConfig {
fn default() -> Self {
Self { auto_migrate: true }
}
}
#[cfg(feature = "testcontainers")]
pub async fn build_testing_database_service() -> RecorderResult<DatabaseService> {
pub async fn build_testing_database_service(
config: TestingDatabaseServiceConfig,
) -> RecorderResult<DatabaseService> {
use testcontainers::{ImageExt, runners::AsyncRunner};
use testcontainers_ext::{ImageDefaultLogConsumerExt, ImagePruneExistedLabelExt};
use testcontainers_modules::postgres::Postgres;
@ -34,7 +46,7 @@ pub async fn build_testing_database_service() -> RecorderResult<DatabaseService>
connect_timeout: 5000,
idle_timeout: 10000,
acquire_timeout: None,
auto_migrate: true,
auto_migrate: config.auto_migrate,
})
.await?;
db_service.container = Some(container);

View File

@ -3,4 +3,5 @@ pub mod crypto;
pub mod database;
pub mod mikan;
pub mod storage;
pub mod task;
pub mod tracing;

View File

@ -0,0 +1,15 @@
use std::sync::Arc;
use crate::{
app::AppContextTrait,
errors::RecorderResult,
task::{TaskConfig, TaskService},
};
pub async fn build_testing_task_service(
ctx: Arc<dyn AppContextTrait>,
) -> RecorderResult<TaskService> {
let config = TaskConfig {};
let task_service = TaskService::from_config_and_ctx(config, ctx).await?;
Ok(task_service)
}