feat: more task system

This commit is contained in:
master 2025-05-10 02:34:11 +08:00
parent 9d58d961bd
commit d4bdc677a9
43 changed files with 1180 additions and 835 deletions

898
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -13,8 +13,8 @@ moka = "0.12"
futures = "0.3"
quirks_path = "0.1"
snafu = { version = "0.8", features = ["futures"] }
testcontainers = { version = "0.23.3" }
testcontainers-modules = { version = "0.11.4" }
testcontainers = { version = "0.24" }
testcontainers-modules = { version = "0.12" }
testcontainers-ext = { version = "0.1.0", features = ["tracing"] }
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["macros", "fs", "rt-multi-thread"] }

View File

@ -67,19 +67,9 @@ impl AppBuilder {
}
pub async fn build(self) -> RecorderResult<App> {
AppConfig::load_dotenv(
&self.environment,
&self.working_dir,
self.dotenv_file.as_deref(),
)
.await?;
self.load_env().await?;
let config = AppConfig::load_config(
&self.environment,
&self.working_dir,
self.config_file.as_deref(),
)
.await?;
let config = self.load_config().await?;
let app_context =
AppContext::new(self.environment.clone(), config, self.working_dir.clone()).await?;
@ -90,6 +80,26 @@ impl AppBuilder {
})
}
pub async fn load_env(&self) -> RecorderResult<()> {
AppConfig::load_dotenv(
&self.environment,
&self.working_dir,
self.dotenv_file.as_deref(),
)
.await?;
Ok(())
}
pub async fn load_config(&self) -> RecorderResult<AppConfig> {
let config = AppConfig::load_config(
&self.environment,
&self.working_dir,
self.config_file.as_deref(),
)
.await?;
Ok(config)
}
pub fn working_dir(self, working_dir: String) -> Self {
let mut ret = self;
ret.working_dir = working_dir;

View File

@ -20,3 +20,5 @@ complexity_limit = inf
[crypto]
[task]
[message]

View File

@ -11,7 +11,8 @@ use super::env::Environment;
use crate::{
auth::AuthConfig, cache::CacheConfig, crypto::CryptoConfig, database::DatabaseConfig,
errors::RecorderResult, extract::mikan::MikanConfig, graphql::GraphQLConfig,
logger::LoggerConfig, storage::StorageConfig, tasks::TaskConfig, web::WebServerConfig,
logger::LoggerConfig, message::MessageConfig, storage::StorageConfig, task::TaskConfig,
web::WebServerConfig,
};
const DEFAULT_CONFIG_MIXIN: &str = include_str!("./default_mixin.toml");
@ -28,7 +29,8 @@ pub struct AppConfig {
pub graphql: GraphQLConfig,
pub logger: LoggerConfig,
pub database: DatabaseConfig,
pub tasks: TaskConfig,
pub task: TaskConfig,
pub message: MessageConfig,
}
impl AppConfig {

View File

@ -12,8 +12,9 @@ use crate::{
extract::mikan::MikanClient,
graphql::GraphQLService,
logger::LoggerService,
message::MessageService,
storage::{StorageService, StorageServiceTrait},
tasks::TaskService,
task::TaskService,
};
pub trait AppContextTrait: Send + Sync + Debug {
@ -29,6 +30,7 @@ pub trait AppContextTrait: Send + Sync + Debug {
fn environment(&self) -> &Environment;
fn crypto(&self) -> &CryptoService;
fn task(&self) -> &TaskService;
fn message(&self) -> &MessageService;
}
pub struct AppContext {
@ -43,6 +45,7 @@ pub struct AppContext {
crypto: CryptoService,
working_dir: String,
environment: Environment,
message: MessageService,
task: OnceCell<TaskService>,
}
@ -58,6 +61,7 @@ impl AppContext {
let cache = CacheService::from_config(config.cache).await?;
let db = DatabaseService::from_config(config.database).await?;
let storage = StorageService::from_config(config.storage).await?;
let message = MessageService::from_config(config.message).await?;
let auth = AuthService::from_conf(config.auth).await?;
let mikan = MikanClient::from_config(config.mikan).await?;
let crypto = CryptoService::from_config(config.crypto).await?;
@ -75,12 +79,13 @@ impl AppContext {
working_dir: working_dir.to_string(),
graphql,
crypto,
message,
task: OnceCell::new(),
});
ctx.task
.get_or_try_init(async || {
TaskService::from_config_and_ctx(config.tasks, ctx.clone()).await
TaskService::from_config_and_ctx(config.task, ctx.clone()).await
})
.await?;
@ -131,4 +136,7 @@ impl AppContextTrait for AppContext {
fn task(&self) -> &TaskService {
self.task.get().expect("task should be set")
}
fn message(&self) -> &MessageService {
&self.message
}
}

View File

@ -0,0 +1,16 @@
use recorder::{app::AppBuilder, database::DatabaseService, errors::RecorderResult};
#[tokio::main]
async fn main() -> RecorderResult<()> {
let builder = AppBuilder::from_main_cli(None).await?;
builder.load_env().await?;
let mut database_config = builder.load_config().await?.database;
database_config.auto_migrate = false;
let database_service = DatabaseService::from_config(database_config).await?;
database_service.migrate_down().await?;
Ok(())
}

View File

@ -52,19 +52,34 @@ impl DatabaseService {
// .await?;
// }
if config.auto_migrate {
Migrator::up(&db, None).await?;
{
let pool = db.get_postgres_connection_pool();
PostgresStorage::setup(pool).await?;
}
}
Ok(Self {
let me = Self {
connection: db,
#[cfg(all(test, feature = "testcontainers"))]
container: None,
})
};
if config.auto_migrate {
me.migrate_up().await?;
}
Ok(me)
}
pub async fn migrate_up(&self) -> RecorderResult<()> {
Migrator::up(&self.connection, None).await?;
{
let pool = &self.get_postgres_connection_pool();
PostgresStorage::setup(pool).await?;
}
Ok(())
}
pub async fn migrate_down(&self) -> RecorderResult<()> {
Migrator::down(&self.connection, None).await?;
{
let _pool = &self.get_postgres_connection_pool();
}
Ok(())
}
}

View File

@ -29,6 +29,7 @@ pub use web::{
extract_mikan_episode_meta_from_episode_homepage_html,
scrape_mikan_bangumi_meta_from_bangumi_homepage_url,
scrape_mikan_bangumi_meta_list_from_season_flow_url,
scrape_mikan_bangumi_meta_stream_from_season_flow_url,
scrape_mikan_episode_meta_from_episode_homepage_url, scrape_mikan_poster_data_from_image_url,
scrape_mikan_poster_meta_from_image_url,
};

View File

@ -1,7 +1,9 @@
use std::{borrow::Cow, fmt, sync::Arc};
use async_stream::try_stream;
use bytes::Bytes;
use fetch::{html::fetch_html, image::fetch_image};
use futures::{Stream, TryStreamExt, pin_mut};
use html_escape::decode_html_entities;
use scraper::{Html, Selector};
use serde::{Deserialize, Serialize};
@ -24,7 +26,7 @@ use crate::{
storage::{StorageContentCategory, StorageServiceTrait},
};
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Eq)]
pub struct MikanBangumiIndexMeta {
pub homepage: Url,
pub origin_poster_src: Option<Url>,
@ -32,13 +34,13 @@ pub struct MikanBangumiIndexMeta {
pub mikan_bangumi_id: String,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Eq)]
pub struct MikanFansubMeta {
pub mikan_fansub_id: String,
pub fansub: String,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Eq)]
pub struct MikanBangumiMeta {
pub homepage: Url,
pub origin_poster_src: Option<Url>,
@ -675,66 +677,83 @@ pub fn extract_mikan_bangumi_meta_from_expand_subscribed_fragment(
}
}
pub fn scrape_mikan_bangumi_meta_stream_from_season_flow_url(
ctx: Arc<dyn AppContextTrait>,
mikan_season_flow_url: Url,
credential_id: i32,
) -> impl Stream<Item = RecorderResult<MikanBangumiMeta>> {
try_stream! {
let mikan_client = ctx.mikan()
.fork_with_credential(ctx.clone(), credential_id)
.await?;
let mikan_base_url = mikan_client.base_url();
let content = fetch_html(&mikan_client, mikan_season_flow_url.clone()).await?;
let mut bangumi_indices_meta = {
let html = Html::parse_document(&content);
extract_mikan_bangumi_index_meta_list_from_season_flow_fragment(&html, mikan_base_url)
};
if bangumi_indices_meta.is_empty() && !mikan_client.has_login().await? {
mikan_client.login().await?;
let content = fetch_html(&mikan_client, mikan_season_flow_url).await?;
let html = Html::parse_document(&content);
bangumi_indices_meta =
extract_mikan_bangumi_index_meta_list_from_season_flow_fragment(&html, mikan_base_url);
}
mikan_client
.sync_credential_cookies(ctx.clone(), credential_id)
.await?;
for bangumi_index in bangumi_indices_meta {
let bangumi_title = bangumi_index.bangumi_title.clone();
let bangumi_expand_subscribed_fragment_url = build_mikan_bangumi_expand_subscribed_url(
mikan_base_url.clone(),
&bangumi_index.mikan_bangumi_id,
);
let bangumi_expand_subscribed_fragment =
fetch_html(&mikan_client, bangumi_expand_subscribed_fragment_url).await?;
let bangumi_meta = {
let html = Html::parse_document(&bangumi_expand_subscribed_fragment);
extract_mikan_bangumi_meta_from_expand_subscribed_fragment(
&html,
bangumi_index,
mikan_base_url.clone(),
)
.with_whatever_context::<_, String, RecorderError>(|| {
format!("failed to extract mikan bangumi fansub of title = {bangumi_title}")
})
}?;
yield bangumi_meta;
}
mikan_client
.sync_credential_cookies(ctx, credential_id)
.await?;
}
}
#[instrument(err, skip_all, fields(mikan_season_flow_url = mikan_season_flow_url.as_str(), credential_id = credential_id))]
pub async fn scrape_mikan_bangumi_meta_list_from_season_flow_url(
mikan_client: &MikanClient,
_mikan_client: &MikanClient,
ctx: Arc<dyn AppContextTrait>,
mikan_season_flow_url: Url,
credential_id: i32,
) -> RecorderResult<Vec<MikanBangumiMeta>> {
let mikan_client = mikan_client
.fork_with_credential(ctx.clone(), credential_id)
.await?;
let stream = scrape_mikan_bangumi_meta_stream_from_season_flow_url(
ctx,
mikan_season_flow_url,
credential_id,
);
let mikan_base_url = mikan_client.base_url();
let content = fetch_html(&mikan_client, mikan_season_flow_url.clone()).await?;
let mut bangumi_indices_meta = {
let html = Html::parse_document(&content);
extract_mikan_bangumi_index_meta_list_from_season_flow_fragment(&html, mikan_base_url)
};
pin_mut!(stream);
if bangumi_indices_meta.is_empty() && !mikan_client.has_login().await? {
mikan_client.login().await?;
let content = fetch_html(&mikan_client, mikan_season_flow_url).await?;
let html = Html::parse_document(&content);
bangumi_indices_meta =
extract_mikan_bangumi_index_meta_list_from_season_flow_fragment(&html, mikan_base_url);
}
let mut bangumi_metas = vec![];
mikan_client
.sync_credential_cookies(ctx.clone(), credential_id)
.await?;
for bangumi_index in bangumi_indices_meta {
let bangumi_title = bangumi_index.bangumi_title.clone();
let bangumi_expand_subscribed_fragment_url = build_mikan_bangumi_expand_subscribed_url(
mikan_base_url.clone(),
&bangumi_index.mikan_bangumi_id,
);
let bangumi_expand_subscribed_fragment =
fetch_html(&mikan_client, bangumi_expand_subscribed_fragment_url).await?;
let bangumi_meta = {
let html = Html::parse_document(&bangumi_expand_subscribed_fragment);
extract_mikan_bangumi_meta_from_expand_subscribed_fragment(
&html,
bangumi_index,
mikan_base_url.clone(),
)
.with_whatever_context::<_, String, RecorderError>(|| {
format!("failed to extract mikan bangumi fansub of title = {bangumi_title}")
})
}?;
bangumi_metas.push(bangumi_meta);
}
mikan_client
.sync_credential_cookies(ctx, credential_id)
.await?;
let bangumi_metas = stream.try_collect().await?;
Ok(bangumi_metas)
}

View File

@ -0,0 +1,5 @@
use async_graphql::{Context, dynamic::SchemaBuilder};
pub fn register_mikan_scrape_season_subscription_mutation(builder: SchemaBuilder) {
}

View File

@ -1 +1 @@
mod mikan_scrape_season_subscription;

View File

@ -111,6 +111,10 @@ pub fn schema(
&mut context,
&subscription_episode::Column::SubscriberId,
);
restrict_subscriber_for_entity::<subscriber_tasks::Entity>(
&mut context,
&subscriber_tasks::Column::SubscriberId,
);
for column in subscribers::Column::iter() {
if !matches!(column, subscribers::Column::Id) {
restrict_filter_input_for_entity::<subscribers::Entity>(
@ -151,7 +155,8 @@ pub fn schema(
episodes,
subscription_bangumi,
subscription_episode,
subscriptions
subscriptions,
subscriber_tasks,
]
);
@ -160,6 +165,7 @@ pub fn schema(
builder.register_enumeration::<subscriptions::SubscriptionCategory>();
builder.register_enumeration::<downloaders::DownloaderCategory>();
builder.register_enumeration::<downloads::DownloadMime>();
builder.register_enumeration::<subscriber_tasks::SubscriberTaskType>();
}
let schema = builder.schema_builder();

View File

@ -20,10 +20,11 @@ pub mod errors;
pub mod extract;
pub mod graphql;
pub mod logger;
pub mod message;
pub mod migrations;
pub mod models;
pub mod storage;
pub mod tasks;
pub mod task;
#[cfg(test)]
pub mod test_utils;
pub mod web;

View File

@ -0,0 +1,4 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MessageConfig {}

View File

@ -0,0 +1,5 @@
mod config;
mod service;
pub use config::MessageConfig;
pub use service::MessageService;

View File

@ -0,0 +1,12 @@
use super::MessageConfig;
use crate::errors::RecorderResult;
pub struct MessageService {
pub config: MessageConfig,
}
impl MessageService {
pub async fn from_config(config: MessageConfig) -> RecorderResult<Self> {
Ok(Self { config })
}
}

View File

@ -150,6 +150,18 @@ pub enum Credential3rd {
UserAgent,
}
#[derive(DeriveIden)]
pub enum SubscriberTasks {
Table,
Id,
SubscriberId,
TaskType,
Request,
Result,
Error,
Yields,
}
macro_rules! create_postgres_enum_for_active_enum {
($manager: expr, $active_enum: expr, $($enum_value:expr),+) => {
{

View File

@ -338,7 +338,12 @@ impl MigrationTrait for Migration {
.await?;
manager
.drop_table(Table::drop().table(SubscriptionEpisode::Table).to_owned())
.drop_table(
Table::drop()
.if_exists()
.table(SubscriptionEpisode::Table)
.to_owned(),
)
.await?;
manager
@ -346,7 +351,7 @@ impl MigrationTrait for Migration {
.await?;
manager
.drop_table(Table::drop().table(Episodes::Table).to_owned())
.drop_table(Table::drop().if_exists().table(Episodes::Table).to_owned())
.await?;
manager
@ -360,7 +365,12 @@ impl MigrationTrait for Migration {
.await?;
manager
.drop_table(Table::drop().table(SubscriptionBangumi::Table).to_owned())
.drop_table(
Table::drop()
.if_exists()
.table(SubscriptionBangumi::Table)
.to_owned(),
)
.await?;
manager
@ -368,7 +378,7 @@ impl MigrationTrait for Migration {
.await?;
manager
.drop_table(Table::drop().table(Bangumi::Table).to_owned())
.drop_table(Table::drop().if_exists().table(Bangumi::Table).to_owned())
.await?;
manager
@ -379,7 +389,12 @@ impl MigrationTrait for Migration {
.await?;
manager
.drop_table(Table::drop().table(Subscriptions::Table).to_owned())
.drop_table(
Table::drop()
.if_exists()
.table(Subscriptions::Table)
.to_owned(),
)
.await?;
manager
@ -387,7 +402,12 @@ impl MigrationTrait for Migration {
.await?;
manager
.drop_table(Table::drop().table(Subscribers::Table).to_owned())
.drop_table(
Table::drop()
.if_exists()
.table(Subscribers::Table)
.to_owned(),
)
.await?;
manager

View File

@ -88,7 +88,7 @@ impl MigrationTrait for Migration {
.col(enumeration(
Downloads::Status,
DownloadStatusEnum,
DownloadMime::iden_values(),
DownloadStatus::iden_values(),
))
.col(enumeration(
Downloads::Mime,
@ -158,7 +158,7 @@ impl MigrationTrait for Migration {
.await?;
manager
.drop_table(Table::drop().table(Downloads::Table).to_owned())
.drop_table(Table::drop().if_exists().table(Downloads::Table).to_owned())
.await?;
manager
@ -174,7 +174,12 @@ impl MigrationTrait for Migration {
.await?;
manager
.drop_table(Table::drop().table(Downloaders::Table).to_owned())
.drop_table(
Table::drop()
.if_exists()
.table(Downloaders::Table)
.to_owned(),
)
.await?;
manager

View File

@ -1,103 +0,0 @@
use sea_orm_migration::{prelude::*, schema::*};
use super::defs::table_auto_z;
use crate::{
migrations::defs::{CustomSchemaManagerExt, Downloaders, GeneralIds, Subscribers},
models::downloaders::{DownloaderCategory, DownloaderCategoryEnum},
};
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
create_postgres_enum_for_active_enum!(
manager,
DownloaderCategoryEnum,
DownloaderCategory::QBittorrent
)
.await?;
manager
.create_table(
table_auto_z(Downloaders::Table)
.col(pk_auto(Downloaders::Id))
.col(text(Downloaders::Endpoint))
.col(string_null(Downloaders::Username))
.col(string_null(Downloaders::Password))
.col(enumeration(
Downloaders::Category,
DownloaderCategoryEnum,
DownloaderCategory::iden_values(),
))
.col(text(Downloaders::SavePath))
.col(integer(Downloaders::SubscriberId))
.foreign_key(
ForeignKey::create()
.name("fk_downloader_subscriber_id")
.from_tbl(Downloaders::Table)
.from_col(Downloaders::SubscriberId)
.to_tbl(Subscribers::Table)
.to_col(Subscribers::Id)
.on_delete(ForeignKeyAction::Cascade)
.on_update(ForeignKeyAction::Restrict),
)
.to_owned(),
)
.await?;
manager
.create_postgres_auto_update_ts_trigger_for_col(
Downloaders::Table,
GeneralIds::UpdatedAt,
)
.await?;
manager
.alter_table(
Table::alter()
.table(Subscribers::Table)
.add_column_if_not_exists(integer_null(Subscribers::DownloaderId))
.add_foreign_key(
TableForeignKey::new()
.name("fk_subscribers_downloader_id")
.from_tbl(Subscribers::Table)
.from_col(Subscribers::DownloaderId)
.to_tbl(Downloaders::Table)
.to_col(Downloaders::Id)
.on_delete(ForeignKeyAction::SetNull)
.on_update(ForeignKeyAction::Restrict),
)
.to_owned(),
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Subscribers::Table)
.drop_foreign_key(Alias::new("fk_subscribers_downloader_id"))
.drop_column(Subscribers::DownloaderId)
.to_owned(),
)
.await?;
manager
.drop_postgres_auto_update_ts_trigger_for_col(Downloaders::Table, GeneralIds::UpdatedAt)
.await?;
manager
.drop_table(Table::drop().table(Downloaders::Table).to_owned())
.await?;
manager
.drop_postgres_enum_for_active_enum(DownloaderCategoryEnum)
.await?;
Ok(())
}
}

View File

@ -52,6 +52,7 @@ impl MigrationTrait for Migration {
manager
.create_index(
Index::create()
.if_not_exists()
.name("idx_auth_pid_auth_type")
.unique()
.table(Auth::Table)
@ -102,7 +103,7 @@ impl MigrationTrait for Migration {
.await?;
manager
.drop_table(Table::drop().table(Auth::Table).to_owned())
.drop_table(Table::drop().if_exists().table(Auth::Table).to_owned())
.await?;
manager

View File

@ -48,6 +48,7 @@ impl MigrationTrait for Migration {
manager
.create_index(
Index::create()
.if_not_exists()
.name("idx_credential_3rd_credential_type")
.table(Credential3rd::Table)
.col(Credential3rd::CredentialType)
@ -95,7 +96,19 @@ impl MigrationTrait for Migration {
.await?;
manager
.drop_table(Table::drop().table(Credential3rd::Table).to_owned())
.drop_postgres_auto_update_ts_trigger_for_col(
Credential3rd::Table,
GeneralIds::UpdatedAt,
)
.await?;
manager
.drop_table(
Table::drop()
.if_exists()
.table(Credential3rd::Table)
.to_owned(),
)
.await?;
manager

View File

@ -0,0 +1,81 @@
use sea_orm_migration::{
prelude::*,
schema::{array, enumeration, integer, json_binary, json_binary_null, pk_auto},
};
use super::defs::{SubscriberTasks, Subscribers, table_auto_z};
use crate::models::subscriber_tasks::{SubscriberTaskType, SubscriberTaskTypeEnum};
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
table_auto_z(SubscriberTasks::Table)
.col(pk_auto(SubscriberTasks::Id))
.col(integer(SubscriberTasks::SubscriberId))
.col(enumeration(
SubscriberTasks::TaskType,
SubscriberTaskTypeEnum,
SubscriberTaskType::iden_values(),
))
.col(json_binary(SubscriberTasks::Request))
.col(json_binary_null(SubscriberTasks::Result))
.col(json_binary_null(SubscriberTasks::Error))
.col(
array(SubscriberTasks::Yields, ColumnType::JsonBinary)
.default(SimpleExpr::Custom(String::from("ARRAY[]::jsonb[]"))),
)
.foreign_key(
ForeignKey::create()
.name("fk_subscriber_tasks_subscriber_id")
.from_tbl(SubscriberTasks::Table)
.from_col(SubscriberTasks::SubscriberId)
.to_tbl(Subscribers::Table)
.to_col(Subscribers::Id)
.on_delete(ForeignKeyAction::Cascade)
.on_update(ForeignKeyAction::Cascade),
)
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.if_not_exists()
.name("idx_subscriber_tasks_task_type")
.table(SubscriberTasks::Table)
.col(SubscriberTasks::TaskType)
.to_owned(),
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_index(
Index::drop()
.if_exists()
.name("idx_subscriber_tasks_task_type")
.table(SubscriberTasks::Table)
.to_owned(),
)
.await?;
manager
.drop_table(
Table::drop()
.if_exists()
.table(SubscriberTasks::Table)
.to_owned(),
)
.await?;
Ok(())
}
}

View File

@ -5,9 +5,9 @@ pub use sea_orm_migration::prelude::*;
pub mod defs;
pub mod m20220101_000001_init;
pub mod m20240224_082543_add_downloads;
pub mod m20240225_060853_subscriber_add_downloader;
pub mod m20241231_000001_auth;
pub mod m20250501_021523_credential_3rd;
pub mod m20250508_022044_subscriber_tasks;
pub struct Migrator;
@ -17,9 +17,9 @@ impl MigratorTrait for Migrator {
vec![
Box::new(m20220101_000001_init::Migration),
Box::new(m20240224_082543_add_downloads::Migration),
Box::new(m20240225_060853_subscriber_add_downloader::Migration),
Box::new(m20241231_000001_auth::Migration),
Box::new(m20250501_021523_credential_3rd::Migration),
Box::new(m20250508_022044_subscriber_tasks::Migration),
]
}
}

View File

@ -5,6 +5,7 @@ pub mod downloaders;
pub mod downloads;
pub mod episodes;
pub mod query;
pub mod subscriber_tasks;
pub mod subscribers;
pub mod subscription_bangumi;
pub mod subscription_episode;

View File

@ -0,0 +1,153 @@
use std::sync::Arc;
use sea_orm::{ActiveValue, FromJsonQueryResult, JsonValue, TryIntoModel, prelude::*};
use serde::{Deserialize, Serialize};
pub use crate::task::{SubscriberTaskType, SubscriberTaskTypeEnum};
use crate::{app::AppContextTrait, errors::RecorderResult, task::SubscriberTask};
#[derive(Debug, Clone, Serialize, Deserialize, FromJsonQueryResult, PartialEq, Eq)]
pub struct SubscriberTaskErrorSnapshot {
pub message: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, DeriveEntityModel, PartialEq, Eq)]
#[sea_orm(table_name = "subscriber_tasks")]
pub struct Model {
#[sea_orm(default_expr = "Expr::current_timestamp()")]
pub created_at: DateTimeUtc,
#[sea_orm(default_expr = "Expr::current_timestamp()")]
pub updated_at: DateTimeUtc,
#[sea_orm(primary_key)]
pub id: i32,
pub subscriber_id: i32,
pub task_type: SubscriberTaskType,
pub request: JsonValue,
pub yields: Vec<JsonValue>,
pub result: Option<JsonValue>,
pub error: Option<JsonValue>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::subscribers::Entity",
from = "Column::SubscriberId",
to = "super::subscribers::Column::Id"
)]
Subscriber,
}
impl Related<super::subscribers::Entity> for Entity {
fn to() -> RelationDef {
Relation::Subscriber.def()
}
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)]
pub enum RelatedEntity {
#[sea_orm(entity = "super::subscribers::Entity")]
Subscriber,
}
impl ActiveModelBehavior for ActiveModel {}
impl Model {
pub async fn update_result<R>(
ctx: Arc<dyn AppContextTrait>,
task_id: i32,
result: R,
) -> RecorderResult<()>
where
R: Serialize,
{
let db = ctx.db();
let result_value = serde_json::to_value(result)?;
Entity::update_many()
.filter(Column::Id.eq(task_id))
.set(ActiveModel {
result: ActiveValue::Set(Some(result_value)),
..Default::default()
})
.exec(db)
.await?;
Ok(())
}
pub async fn update_error(
ctx: Arc<dyn AppContextTrait>,
task_id: i32,
error: SubscriberTaskErrorSnapshot,
) -> RecorderResult<()> {
let db = ctx.db();
let error_value = serde_json::to_value(&error)?;
Entity::update_many()
.filter(Column::Id.eq(task_id))
.set(ActiveModel {
error: ActiveValue::Set(Some(error_value)),
..Default::default()
})
.exec(db)
.await?;
Ok(())
}
pub async fn append_yield<Y>(
ctx: Arc<dyn AppContextTrait>,
task_id: i32,
item: Y,
) -> RecorderResult<()>
where
Y: Serialize,
{
let db = ctx.db();
let yield_value = serde_json::to_value(item)?;
Entity::update_many()
.filter(Column::Id.eq(task_id))
.col_expr(
Column::Yields,
Expr::cust_with_values("array_append($1)", [yield_value]),
)
.exec(db)
.await?;
Ok(())
}
pub async fn add_subscriber_task(
ctx: Arc<dyn AppContextTrait>,
subscriber_id: i32,
task_type: SubscriberTaskType,
request: JsonValue,
) -> RecorderResult<Model> {
let am = ActiveModel {
subscriber_id: ActiveValue::Set(subscriber_id),
task_type: ActiveValue::Set(task_type.clone()),
request: ActiveValue::Set(request.clone()),
..Default::default()
};
let db = ctx.db();
let model = am.insert(db).await?.try_into_model()?;
let task_value: SubscriberTask = serde_json::from_value(serde_json::json!({
"id": model.id,
"subscriber_id": model.subscriber_id.clone(),
"task_type": model.task_type.clone(),
"request": model.request.clone(),
}))?;
ctx.task().add_subscriber_task(task_value).await?;
Ok(model)
}
}

View File

@ -39,6 +39,8 @@ pub enum Relation {
Episode,
#[sea_orm(has_many = "super::auth::Entity")]
Auth,
#[sea_orm(has_many = "super::subscriber_tasks::Entity")]
SubscriberTasks,
}
impl Related<super::subscriptions::Entity> for Entity {
@ -71,6 +73,12 @@ impl Related<super::auth::Entity> for Entity {
}
}
impl Related<super::subscriber_tasks::Entity> for Entity {
fn to() -> RelationDef {
Relation::SubscriberTasks.def()
}
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)]
pub enum RelatedEntity {
#[sea_orm(entity = "super::subscriptions::Entity")]
@ -81,6 +89,8 @@ pub enum RelatedEntity {
Bangumi,
#[sea_orm(entity = "super::episodes::Entity")]
Episode,
#[sea_orm(entity = "super::subscriber_tasks::Entity")]
SubscriberTasks,
}
#[derive(Debug, Deserialize, Serialize)]

View File

@ -147,8 +147,6 @@ pub enum RelatedEntity {
SubscriptionEpisode,
#[sea_orm(entity = "super::subscription_bangumi::Entity")]
SubscriptionBangumi,
#[sea_orm(entity = "super::credential_3rd::Entity")]
Credential3rd,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]

View File

@ -0,0 +1,81 @@
use std::sync::Arc;
use futures::{Stream, TryStreamExt, pin_mut};
use serde::{Serialize, de::DeserializeOwned};
use crate::{
app::AppContextTrait,
errors::RecorderResult,
models::subscriber_tasks::{self, SubscriberTaskErrorSnapshot},
};
pub const SUBSCRIBER_TASK_APALIS_NAME: &str = "subscriber_task";
#[async_trait::async_trait]
pub trait SubscriberAsyncTaskTrait: Serialize + DeserializeOwned + Sized {
type Result: Serialize + DeserializeOwned + Send;
async fn run_async(
self,
ctx: Arc<dyn AppContextTrait>,
id: i32,
) -> RecorderResult<Self::Result>;
async fn run(self, ctx: Arc<dyn AppContextTrait>, id: i32) -> RecorderResult<()> {
match self.run_async(ctx.clone(), id).await {
Ok(result) => {
subscriber_tasks::Model::update_result(ctx, id, result).await?;
}
Err(e) => {
let error_snapshot = SubscriberTaskErrorSnapshot {
message: e.to_string(),
};
subscriber_tasks::Model::update_error(ctx, id, error_snapshot).await?;
return Err(e);
}
}
Ok(())
}
}
#[async_trait::async_trait]
pub trait SubscriberStreamTaskTrait: Serialize + DeserializeOwned + Sized {
type Yield: Serialize + DeserializeOwned + Send;
fn run_stream(
self,
ctx: Arc<dyn AppContextTrait>,
) -> impl Stream<Item = RecorderResult<Self::Yield>> + Send;
async fn run(self, ctx: Arc<dyn AppContextTrait>, id: i32) -> RecorderResult<()> {
let stream = self.run_stream(ctx.clone());
pin_mut!(stream);
loop {
match stream.try_next().await {
Ok(Some(result)) => {
subscriber_tasks::Model::append_yield(ctx.clone(), id, result).await?;
}
Ok(None) => {
subscriber_tasks::Model::update_result(ctx, id, ()).await?;
break;
}
Err(e) => {
let error_snapshot = SubscriberTaskErrorSnapshot {
message: e.to_string(),
};
subscriber_tasks::Model::update_error(ctx, id, error_snapshot).await?;
return Err(e);
}
}
}
Ok(())
}
}

View File

@ -0,0 +1,3 @@
mod scrape_season_subscription;
pub use scrape_season_subscription::MikanScrapeSeasonSubscriptionTask;

View File

@ -0,0 +1,45 @@
use std::sync::Arc;
use futures::Stream;
use sea_orm::FromJsonQueryResult;
use serde::{Deserialize, Serialize};
use crate::{
app::AppContextTrait,
errors::RecorderResult,
extract::mikan::{
MikanBangumiMeta, MikanSeasonStr, build_mikan_season_flow_url,
scrape_mikan_bangumi_meta_stream_from_season_flow_url,
},
task::SubscriberStreamTaskTrait,
};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, FromJsonQueryResult)]
pub struct MikanScrapeSeasonSubscriptionTask {
pub task_id: i32,
pub year: i32,
pub season_str: MikanSeasonStr,
pub credential_id: i32,
pub subscriber_id: i32,
}
#[async_trait::async_trait]
impl SubscriberStreamTaskTrait for MikanScrapeSeasonSubscriptionTask {
type Yield = MikanBangumiMeta;
fn run_stream(
self,
ctx: Arc<dyn AppContextTrait>,
) -> impl Stream<Item = RecorderResult<Self::Yield>> {
let mikan_base_url = ctx.mikan().base_url().clone();
let mikan_season_flow_url =
build_mikan_season_flow_url(mikan_base_url, self.year, self.season_str);
scrape_mikan_bangumi_meta_stream_from_season_flow_url(
ctx.clone(),
mikan_season_flow_url,
self.credential_id,
)
}
}

View File

@ -0,0 +1,13 @@
mod config;
mod core;
pub mod mikan;
mod registry;
mod service;
pub use core::{SUBSCRIBER_TASK_APALIS_NAME, SubscriberAsyncTaskTrait, SubscriberStreamTaskTrait};
pub use config::TaskConfig;
pub use registry::{
SubscriberTask, SubscriberTaskPayload, SubscriberTaskType, SubscriberTaskTypeEnum,
};
pub use service::TaskService;

View File

@ -0,0 +1,33 @@
use sea_orm::{DeriveActiveEnum, DeriveDisplay, prelude::*};
use serde::{Deserialize, Serialize};
use super::mikan::MikanScrapeSeasonSubscriptionTask;
#[derive(
Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize,
)]
#[sea_orm(
rs_type = "String",
db_type = "String(StringLen::None)",
enum_name = "subscriber_task_type"
)]
#[serde(rename_all = "snake_case")]
pub enum SubscriberTaskType {
#[sea_orm(string_value = "mikan_scrape_season_subscription")]
MikanScrapeSeasonSubscription,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "task_type")]
pub enum SubscriberTaskPayload {
#[serde(rename = "mikan_scrape_season_subscription")]
MikanScrapeSeasonSubscription(MikanScrapeSeasonSubscriptionTask),
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct SubscriberTask {
pub id: i32,
pub subscriber_id: i32,
#[serde(flatten)]
pub payload: SubscriberTaskPayload,
}

View File

@ -0,0 +1,77 @@
use std::{ops::Deref, sync::Arc};
use apalis::prelude::*;
use apalis_sql::postgres::PostgresStorage;
use tokio::sync::RwLock;
use crate::{
app::AppContextTrait,
errors::RecorderResult,
task::{
SUBSCRIBER_TASK_APALIS_NAME, SubscriberStreamTaskTrait, SubscriberTask,
SubscriberTaskPayload, TaskConfig,
},
};
pub struct TaskService {
pub config: TaskConfig,
ctx: Arc<dyn AppContextTrait>,
subscriber_task_storage: Arc<RwLock<PostgresStorage<SubscriberTask>>>,
}
impl TaskService {
pub async fn from_config_and_ctx(
config: TaskConfig,
ctx: Arc<dyn AppContextTrait>,
) -> RecorderResult<Self> {
let pool = ctx.db().get_postgres_connection_pool().clone();
let subscriber_task_storage = Arc::new(RwLock::new(PostgresStorage::new(pool)));
Ok(Self {
config,
ctx,
subscriber_task_storage,
})
}
async fn run_subscriber_task(
job: SubscriberTask,
data: Data<Arc<dyn AppContextTrait>>,
) -> RecorderResult<()> {
let ctx = data.deref().clone();
match job.payload {
SubscriberTaskPayload::MikanScrapeSeasonSubscription(task) => {
task.run(ctx, job.id).await
}
}
}
pub async fn add_subscriber_task(&self, job: SubscriberTask) -> RecorderResult<()> {
{
let mut storage = self.subscriber_task_storage.write().await;
storage.push(job).await?;
}
Ok(())
}
pub async fn setup(&self) -> RecorderResult<()> {
let monitor = Monitor::new();
let worker = WorkerBuilder::new(SUBSCRIBER_TASK_APALIS_NAME)
.catch_panic()
.enable_tracing()
.data(self.ctx.clone())
.backend({
let storage = self.subscriber_task_storage.read().await;
storage.clone()
})
.build_fn(Self::run_subscriber_task);
let monitor = monitor.register(worker);
monitor.run().await?;
Ok(())
}
}

View File

@ -1,6 +0,0 @@
mod scrape_season_subscription;
pub use scrape_season_subscription::{
ScrapeMikanSeasonSubscriptionTask, ScrapeMikanSeasonSubscriptionTaskResult,
register_scrape_mikan_season_subscription_task,
};

View File

@ -1,87 +0,0 @@
use std::{ops::Deref, sync::Arc};
use apalis::prelude::*;
use apalis_sql::postgres::PostgresStorage;
use serde::{Deserialize, Serialize};
use crate::{
app::AppContextTrait,
errors::RecorderResult,
extract::mikan::{
MikanBangumiMeta, MikanSeasonStr, build_mikan_season_flow_url,
scrape_mikan_bangumi_meta_list_from_season_flow_url,
},
};
const TASK_NAME: &str = "mikan_extract_season_subscription";
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ScrapeMikanSeasonSubscriptionTask {
pub task_id: i32,
pub year: i32,
pub season_str: MikanSeasonStr,
pub credential_id: i32,
pub subscription_id: i32,
pub subscriber_id: i32,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ScrapeMikanSeasonSubscriptionTaskResult {
pub task_id: i32,
pub year: i32,
pub season_str: MikanSeasonStr,
pub credential_id: i32,
pub subscription_id: i32,
pub subscriber_id: i32,
pub bangumi_meta_list: Vec<MikanBangumiMeta>,
}
pub async fn scrape_mikan_season_subscription(
job: ScrapeMikanSeasonSubscriptionTask,
data: Data<Arc<dyn AppContextTrait>>,
) -> RecorderResult<GoTo<ScrapeMikanSeasonSubscriptionTaskResult>> {
let ctx = data.deref();
let mikan_client = ctx.mikan();
let mikan_base_url = mikan_client.base_url();
let mikan_season_flow_url =
build_mikan_season_flow_url(mikan_base_url.clone(), job.year, job.season_str);
let bangumi_meta_list = scrape_mikan_bangumi_meta_list_from_season_flow_url(
mikan_client,
ctx.clone(),
mikan_season_flow_url,
job.credential_id,
)
.await?;
Ok(GoTo::Done(ScrapeMikanSeasonSubscriptionTaskResult {
bangumi_meta_list,
credential_id: job.credential_id,
season_str: job.season_str,
subscriber_id: job.subscriber_id,
subscription_id: job.subscription_id,
task_id: job.task_id,
year: job.year,
}))
}
pub fn register_scrape_mikan_season_subscription_task(
monitor: Monitor,
ctx: Arc<dyn AppContextTrait>,
) -> RecorderResult<(Monitor, PostgresStorage<StepRequest<serde_json::Value>>)> {
let pool = ctx.db().get_postgres_connection_pool().clone();
let storage = PostgresStorage::new(pool);
let steps = StepBuilder::new().step_fn(scrape_mikan_season_subscription);
let worker = WorkerBuilder::new(TASK_NAME)
.catch_panic()
.enable_tracing()
.data(ctx)
.backend(storage.clone())
.build_stepped(steps);
Ok((monitor.register(worker), storage))
}

View File

@ -1,6 +0,0 @@
pub mod config;
pub mod mikan;
pub mod service;
pub use config::TaskConfig;
pub use service::TaskService;

View File

@ -1,41 +0,0 @@
use std::{fmt::Debug, sync::Arc};
use apalis::prelude::*;
use apalis_sql::postgres::PostgresStorage;
use tokio::sync::Mutex;
use super::{TaskConfig, mikan::register_scrape_mikan_season_subscription_task};
use crate::{app::AppContextTrait, errors::RecorderResult};
pub struct TaskService {
config: TaskConfig,
#[allow(dead_code)]
monitor: Arc<Mutex<Monitor>>,
pub scrape_mikan_season_subscription_task_storage:
PostgresStorage<StepRequest<serde_json::Value>>,
}
impl TaskService {
pub async fn from_config_and_ctx(
config: TaskConfig,
ctx: Arc<dyn AppContextTrait>,
) -> RecorderResult<Self> {
let monitor = Monitor::new();
let (monitor, scrape_mikan_season_subscription_task_storage) =
register_scrape_mikan_season_subscription_task(monitor, ctx.clone())?;
Ok(Self {
config,
monitor: Arc::new(Mutex::new(monitor)),
scrape_mikan_season_subscription_task_storage,
})
}
}
impl Debug for TaskService {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TaskService")
.field("config", &self.config)
.finish()
}
}

View File

@ -16,7 +16,8 @@ pub struct UnitTestAppContext {
graphql: Option<crate::graphql::GraphQLService>,
storage: Option<crate::storage::StorageService>,
crypto: Option<crate::crypto::CryptoService>,
tasks: Option<crate::tasks::TaskService>,
task: Option<crate::task::TaskService>,
message: Option<crate::message::MessageService>,
#[builder(default = Some(String::from(env!("CARGO_MANIFEST_DIR"))))]
working_dir: Option<String>,
#[builder(default = crate::app::Environment::Testing, setter(!strip_option))]
@ -74,7 +75,11 @@ impl AppContextTrait for UnitTestAppContext {
self.crypto.as_ref().expect("should set crypto")
}
fn task(&self) -> &crate::tasks::TaskService {
self.tasks.as_ref().expect("should set tasks")
fn task(&self) -> &crate::task::TaskService {
self.task.as_ref().expect("should set tasks")
}
fn message(&self) -> &crate::message::MessageService {
self.message.as_ref().expect("should set message")
}
}

View File

@ -1189,6 +1189,7 @@ export type Subscriptions = {
bangumi: BangumiConnection;
category: SubscriptionCategoryEnum;
createdAt: Scalars['String']['output'];
credentialId?: Maybe<Scalars['Int']['output']>;
displayName: Scalars['String']['output'];
enabled: Scalars['Boolean']['output'];
episode: EpisodesConnection;
@ -1233,6 +1234,7 @@ export type SubscriptionsBasic = {
__typename?: 'SubscriptionsBasic';
category: SubscriptionCategoryEnum;
createdAt: Scalars['String']['output'];
credentialId?: Maybe<Scalars['Int']['output']>;
displayName: Scalars['String']['output'];
enabled: Scalars['Boolean']['output'];
id: Scalars['Int']['output'];
@ -1259,6 +1261,7 @@ export type SubscriptionsFilterInput = {
and?: InputMaybe<Array<SubscriptionsFilterInput>>;
category?: InputMaybe<SubscriptionCategoryEnumFilterInput>;
createdAt?: InputMaybe<TextFilterInput>;
credentialId?: InputMaybe<IntegerFilterInput>;
displayName?: InputMaybe<StringFilterInput>;
enabled?: InputMaybe<BooleanFilterInput>;
id?: InputMaybe<IntegerFilterInput>;
@ -1271,6 +1274,7 @@ export type SubscriptionsFilterInput = {
export type SubscriptionsInsertInput = {
category: SubscriptionCategoryEnum;
createdAt?: InputMaybe<Scalars['String']['input']>;
credentialId?: InputMaybe<Scalars['Int']['input']>;
displayName: Scalars['String']['input'];
enabled: Scalars['Boolean']['input'];
id?: InputMaybe<Scalars['Int']['input']>;
@ -1281,6 +1285,7 @@ export type SubscriptionsInsertInput = {
export type SubscriptionsOrderInput = {
category?: InputMaybe<OrderByEnum>;
createdAt?: InputMaybe<OrderByEnum>;
credentialId?: InputMaybe<OrderByEnum>;
displayName?: InputMaybe<OrderByEnum>;
enabled?: InputMaybe<OrderByEnum>;
id?: InputMaybe<OrderByEnum>;
@ -1292,6 +1297,7 @@ export type SubscriptionsOrderInput = {
export type SubscriptionsUpdateInput = {
category?: InputMaybe<SubscriptionCategoryEnum>;
createdAt?: InputMaybe<Scalars['String']['input']>;
credentialId?: InputMaybe<Scalars['Int']['input']>;
displayName?: InputMaybe<Scalars['String']['input']>;
enabled?: InputMaybe<Scalars['Boolean']['input']>;
id?: InputMaybe<Scalars['Int']['input']>;

View File

@ -19,6 +19,9 @@ dev-proxy:
dev-recorder:
watchexec -r -w apps/recorder -- cargo run -p recorder --bin recorder_cli -- --environment development
dev-recorder-migrate-down:
cargo run -p recorder --bin migrate_down -- --environment development
dev-deps:
docker compose -f devdeps.compose.yaml up