From f94e17508246f9ae58fba2970cf81d4a2d19b7b3 Mon Sep 17 00:00:00 2001 From: lonelyhentxi Date: Sat, 8 Mar 2025 16:43:00 +0800 Subject: [PATCH] feat: add replay-stream-tasks pattern support --- Cargo.lock | 62 +++- apps/recorder/Cargo.toml | 5 +- apps/recorder/examples/playground.rs | 6 +- apps/recorder/src/app/context.rs | 65 +++- apps/recorder/src/app/core.rs | 8 +- apps/recorder/src/app/mod.rs | 2 +- apps/recorder/src/auth/basic.rs | 4 +- apps/recorder/src/auth/middleware.rs | 11 +- apps/recorder/src/auth/oidc.rs | 4 +- apps/recorder/src/auth/service.rs | 16 +- apps/recorder/src/errors/mod.rs | 20 ++ apps/recorder/src/extract/mikan/client.rs | 29 +- .../recorder/src/extract/mikan/web_extract.rs | 276 +++++++++-------- apps/recorder/src/fetch/client/secrecy.rs | 10 +- apps/recorder/src/graphql/mod.rs | 1 + .../recorder/src/graphql/subscriptions/mod.rs | 0 apps/recorder/src/models/auth.rs | 10 +- apps/recorder/src/models/bangumi.rs | 6 +- apps/recorder/src/models/episodes.rs | 10 +- apps/recorder/src/models/mod.rs | 2 + apps/recorder/src/models/subscribers.rs | 12 +- apps/recorder/src/models/subscriptions.rs | 25 +- apps/recorder/src/models/task_stream_item.rs | 62 ++++ apps/recorder/src/models/tasks.rs | 95 ++++++ apps/recorder/src/tasks/core.rs | 285 +++++++++++++++++- ...gumi_subscriptions_from_my_bangumi_page.rs | 49 --- ...gumi_subscriptions_from_my_bangumi_page.rs | 38 +++ apps/recorder/src/tasks/mikan/mod.rs | 1 + apps/recorder/src/tasks/mod.rs | 4 +- apps/recorder/src/tasks/registry.rs | 1 + apps/recorder/src/tasks/service.rs | 1 + apps/recorder/src/test_utils/app.rs | 62 ++++ apps/recorder/src/test_utils/mod.rs | 1 + apps/recorder/src/test_utils/tracing.rs | 4 +- apps/recorder/src/web/controller/core.rs | 24 +- .../src/web/controller/graphql/mod.rs | 10 +- apps/recorder/src/web/controller/oidc/mod.rs | 14 +- .../src/web/middleware/catch_panic.rs | 7 +- .../src/web/middleware/compression.rs | 7 +- apps/recorder/src/web/middleware/cors.rs | 4 +- apps/recorder/src/web/middleware/etag.rs | 7 +- apps/recorder/src/web/middleware/logger.rs | 11 +- apps/recorder/src/web/middleware/mod.rs | 11 +- apps/recorder/src/web/middleware/remote_ip.rs | 7 +- .../recorder/src/web/middleware/request_id.rs | 7 +- .../src/web/middleware/secure_headers.rs | 4 +- apps/recorder/src/web/middleware/timeout.rs | 7 +- 47 files changed, 989 insertions(+), 318 deletions(-) create mode 100644 apps/recorder/src/graphql/subscriptions/mod.rs create mode 100644 apps/recorder/src/models/task_stream_item.rs create mode 100644 apps/recorder/src/models/tasks.rs delete mode 100644 apps/recorder/src/tasks/create_mikan_bangumi_subscriptions_from_my_bangumi_page.rs create mode 100644 apps/recorder/src/tasks/mikan/create_mikan_bangumi_subscriptions_from_my_bangumi_page.rs create mode 100644 apps/recorder/src/tasks/mikan/mod.rs create mode 100644 apps/recorder/src/tasks/registry.rs create mode 100644 apps/recorder/src/tasks/service.rs create mode 100644 apps/recorder/src/test_utils/app.rs diff --git a/Cargo.lock b/Cargo.lock index dca31c8..b4e7590 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -164,6 +164,12 @@ version = "1.0.96" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6b964d184e89d9b6b67dd2715bc8e74cf3107fb2b529990c90cf517326150bf4" +[[package]] +name = "append-only-vec" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7992085ec035cfe96992dd31bfd495a2ebd31969bb95f624471cb6c0b349e571" + [[package]] name = "arrayvec" version = "0.7.6" @@ -1242,6 +1248,22 @@ dependencies = [ "syn 2.0.98", ] +[[package]] +name = "ctor" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7747ac3a66a06f4ee6718686c8ea976d2d05fb30ada93ebd76b3f9aef97257c" +dependencies = [ + "ctor-proc-macro", + "dtor", +] + +[[package]] +name = "ctor-proc-macro" +version = "0.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f211af61d8efdd104f96e57adf5e426ba1bc3ed7a4ead616e15e5881fd79c4d" + [[package]] name = "curve25519-dalek" version = "4.1.3" @@ -1501,6 +1523,21 @@ dependencies = [ "dtoa", ] +[[package]] +name = "dtor" +version = "0.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bf39a0bfd1f94d62ffdb2802a7e6244c0f34f6ebacf5d4c26547d08cd1d67a5" +dependencies = [ + "dtor-proc-macro", +] + +[[package]] +name = "dtor-proc-macro" +version = "0.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7454e41ff9012c00d53cf7f475c5e3afa3b91b7c90568495495e8d9bf47a1055" + [[package]] name = "dyn-clone" version = "1.0.18" @@ -3008,6 +3045,16 @@ dependencies = [ "mutate_once", ] +[[package]] +name = "kanal" +version = "0.1.0-pre8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05d55519627edaf7fd0f29981f6dc03fb52df3f5b257130eb8d0bf2801ea1d7" +dependencies = [ + "futures-core", + "lock_api", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -4491,6 +4538,7 @@ name = "recorder" version = "0.1.0" dependencies = [ "anyhow", + "append-only-vec", "async-graphql", "async-graphql-axum", "async-stream", @@ -4504,6 +4552,7 @@ dependencies = [ "clap", "color-eyre", "cookie", + "ctor", "dotenv", "fancy-regex", "fastrand", @@ -4519,6 +4568,7 @@ dependencies = [ "ipnetwork", "itertools 0.14.0", "jwt-authorizer", + "kanal", "lazy_static", "leaky-bucket", "librqbit-core", @@ -4544,7 +4594,6 @@ dependencies = [ "sea-orm", "sea-orm-migration", "seaography", - "secrecy", "serde", "serde_json", "serde_variant", @@ -4561,6 +4610,7 @@ dependencies = [ "tracing", "tracing-appender", "tracing-subscriber", + "typed-builder", "url", "uuid", "zune-image", @@ -5291,16 +5341,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "secrecy" -version = "0.10.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e891af845473308773346dc847b2c23ee78fe442e0472ac50e22a18a93d3ae5a" -dependencies = [ - "serde", - "zeroize", -] - [[package]] name = "security-framework" version = "2.11.1" diff --git a/apps/recorder/Cargo.toml b/apps/recorder/Cargo.toml index 9c454a1..0f1ff27 100644 --- a/apps/recorder/Cargo.toml +++ b/apps/recorder/Cargo.toml @@ -121,7 +121,6 @@ http-cache = { version = "0.20.0", features = [ http-cache-semantics = "2.1.0" dotenv = "0.15.0" nom = "8.0.0" -secrecy = { version = "0.10.3", features = ["serde"] } http = "1.2.0" cookie = "0.18.1" async-stream = "0.3.6" @@ -130,6 +129,10 @@ tracing-appender = "0.2.3" clap = "4.5.31" futures-util = "0.3.31" ipnetwork = "0.21.1" +kanal = "0.1.0-pre8" +append-only-vec = "0.1.7" +typed-builder = "0.20.0" +ctor = "0.4.0" [dev-dependencies] serial_test = "3" diff --git a/apps/recorder/examples/playground.rs b/apps/recorder/examples/playground.rs index 7ebd3fe..3ece8a2 100644 --- a/apps/recorder/examples/playground.rs +++ b/apps/recorder/examples/playground.rs @@ -19,7 +19,7 @@ // use sea_orm::ColumnTrait; // use sea_orm_migration::MigratorTrait; -// async fn pull_mikan_bangumi_rss(ctx: &AppContext) -> RResult<()> { +// async fn pull_mikan_bangumi_rss(ctx: &dyn AppContextTrait) -> RResult<()> { // let rss_link = "https://mikanani.me/RSS/Bangumi?bangumiId=3416&subgroupid=370"; // // let rss_link = @@ -27,7 +27,7 @@ // let subscription = if let Some(subscription) = // subscriptions::Entity::find() // .filter(subscriptions::Column::SourceUrl.eq(String::from(rss_link))) -// .one(&ctx.db) +// .one(ctx.db()) // .await? // { // subscription @@ -55,7 +55,7 @@ // let BootResult { // app_context: ctx, .. // } = loco_rs::boot::run_app::(&StartMode::ServerOnly, ctx).await?; -// Migrator::up(&ctx.db, None).await?; +// Migrator::up(ctx.db(), None).await?; // Ok(ctx) // } diff --git a/apps/recorder/src/app/context.rs b/apps/recorder/src/app/context.rs index 584f5b2..8a8d479 100644 --- a/apps/recorder/src/app/context.rs +++ b/apps/recorder/src/app/context.rs @@ -5,17 +5,30 @@ use crate::{ storage::StorageService, }; +pub trait AppContextTrait: Send + Sync { + fn logger(&self) -> &LoggerService; + fn db(&self) -> &DatabaseService; + fn config(&self) -> &AppConfig; + fn cache(&self) -> &CacheService; + fn mikan(&self) -> &MikanClient; + fn auth(&self) -> &AuthService; + fn graphql(&self) -> &GraphQLService; + fn storage(&self) -> &StorageService; + fn working_dir(&self) -> &String; + fn environment(&self) -> &Environment; +} + pub struct AppContext { - pub logger: LoggerService, - pub db: DatabaseService, - pub config: AppConfig, - pub cache: CacheService, - pub mikan: MikanClient, - pub auth: AuthService, - pub graphql: GraphQLService, - pub storage: StorageService, - pub working_dir: String, - pub environment: Environment, + logger: LoggerService, + db: DatabaseService, + config: AppConfig, + cache: CacheService, + mikan: MikanClient, + auth: AuthService, + graphql: GraphQLService, + storage: StorageService, + working_dir: String, + environment: Environment, } impl AppContext { @@ -48,3 +61,35 @@ impl AppContext { }) } } +impl AppContextTrait for AppContext { + fn logger(&self) -> &LoggerService { + &self.logger + } + fn db(&self) -> &DatabaseService { + &self.db + } + fn config(&self) -> &AppConfig { + &self.config + } + fn cache(&self) -> &CacheService { + &self.cache + } + fn mikan(&self) -> &MikanClient { + &self.mikan + } + fn auth(&self) -> &AuthService { + &self.auth + } + fn graphql(&self) -> &GraphQLService { + &self.graphql + } + fn storage(&self) -> &StorageService { + &self.storage + } + fn working_dir(&self) -> &String { + &self.working_dir + } + fn environment(&self) -> &Environment { + &self.environment + } +} diff --git a/apps/recorder/src/app/core.rs b/apps/recorder/src/app/core.rs index 55d13d2..d11b687 100644 --- a/apps/recorder/src/app/core.rs +++ b/apps/recorder/src/app/core.rs @@ -4,7 +4,7 @@ use axum::Router; use futures::try_join; use tokio::signal; -use super::{builder::AppBuilder, context::AppContext}; +use super::{builder::AppBuilder, context::AppContextTrait}; use crate::{ errors::RResult, web::{ @@ -14,7 +14,7 @@ use crate::{ }; pub struct App { - pub context: Arc, + pub context: Arc, pub builder: AppBuilder, } @@ -25,14 +25,14 @@ impl App { pub async fn serve(&self) -> RResult<()> { let context = &self.context; - let config = &context.config; + let config = context.config(); let listener = tokio::net::TcpListener::bind(&format!( "{}:{}", config.server.binding, config.server.port )) .await?; - let mut router = Router::>::new(); + let mut router = Router::>::new(); let (graphqlc, oidcc) = try_join!( controller::graphql::create(context.clone()), diff --git a/apps/recorder/src/app/mod.rs b/apps/recorder/src/app/mod.rs index 838b404..50b39bd 100644 --- a/apps/recorder/src/app/mod.rs +++ b/apps/recorder/src/app/mod.rs @@ -8,5 +8,5 @@ pub use core::App; pub use builder::AppBuilder; pub use config::AppConfig; -pub use context::AppContext; +pub use context::{AppContext, AppContextTrait}; pub use env::Environment; diff --git a/apps/recorder/src/auth/basic.rs b/apps/recorder/src/auth/basic.rs index 9081a49..2ab33c5 100644 --- a/apps/recorder/src/auth/basic.rs +++ b/apps/recorder/src/auth/basic.rs @@ -9,7 +9,7 @@ use super::{ service::{AuthServiceTrait, AuthUserInfo}, }; use crate::{ - app::AppContext, + app::AppContextTrait, models::{auth::AuthType, subscribers::SEED_SUBSCRIBER}, }; @@ -64,7 +64,7 @@ pub struct BasicAuthService { impl AuthServiceTrait for BasicAuthService { async fn extract_user_info( &self, - ctx: &AppContext, + ctx: &dyn AppContextTrait, request: &mut Parts, ) -> Result { if let Ok(AuthBasic { diff --git a/apps/recorder/src/auth/middleware.rs b/apps/recorder/src/auth/middleware.rs index c9a098f..3b12b0e 100644 --- a/apps/recorder/src/auth/middleware.rs +++ b/apps/recorder/src/auth/middleware.rs @@ -7,18 +7,21 @@ use axum::{ response::{IntoResponse, Response}, }; -use crate::{app::AppContext, auth::AuthServiceTrait}; +use crate::{app::AppContextTrait, auth::AuthServiceTrait}; pub async fn header_www_authenticate_middleware( - State(ctx): State>, + State(ctx): State>, request: Request, next: Next, ) -> Response { - let auth_service = &ctx.auth; + let auth_service = ctx.auth(); let (mut parts, body) = request.into_parts(); - let mut response = match auth_service.extract_user_info(&ctx, &mut parts).await { + let mut response = match auth_service + .extract_user_info(ctx.as_ref() as &dyn AppContextTrait, &mut parts) + .await + { Ok(auth_user_info) => { let mut request = Request::from_parts(parts, body); request.extensions_mut().insert(auth_user_info); diff --git a/apps/recorder/src/auth/oidc.rs b/apps/recorder/src/auth/oidc.rs index 8528b2c..13ec21f 100644 --- a/apps/recorder/src/auth/oidc.rs +++ b/apps/recorder/src/auth/oidc.rs @@ -23,7 +23,7 @@ use super::{ errors::AuthError, service::{AuthServiceTrait, AuthUserInfo}, }; -use crate::{app::AppContext, errors::RError, fetch::HttpClient, models::auth::AuthType}; +use crate::{app::AppContextTrait, errors::RError, fetch::HttpClient, models::auth::AuthType}; #[derive(Deserialize, Serialize, Clone, Debug)] pub struct OidcAuthClaims { @@ -261,7 +261,7 @@ impl OidcAuthService { impl AuthServiceTrait for OidcAuthService { async fn extract_user_info( &self, - ctx: &AppContext, + ctx: &dyn AppContextTrait, request: &mut Parts, ) -> Result { let config = &self.config; diff --git a/apps/recorder/src/auth/service.rs b/apps/recorder/src/auth/service.rs index c5609b8..0274c8a 100644 --- a/apps/recorder/src/auth/service.rs +++ b/apps/recorder/src/auth/service.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use async_trait::async_trait; use axum::{ @@ -17,7 +17,7 @@ use super::{ oidc::{OidcAuthClaims, OidcAuthService}, }; use crate::{ - app::AppContext, + app::AppContextTrait, fetch::{ HttpClient, HttpClientConfig, client::{HttpClientCacheBackendConfig, HttpClientCachePresetConfig}, @@ -31,17 +31,17 @@ pub struct AuthUserInfo { pub auth_type: AuthType, } -impl FromRequestParts for AuthUserInfo { +impl FromRequestParts> for AuthUserInfo { type Rejection = Response; async fn from_request_parts( parts: &mut Parts, - state: &AppContext, + state: &Arc, ) -> Result { - let auth_service = &state.auth; + let auth_service = state.auth(); auth_service - .extract_user_info(state, parts) + .extract_user_info(state.as_ref(), parts) .await .map_err(|err| err.into_response()) } @@ -51,7 +51,7 @@ impl FromRequestParts for AuthUserInfo { pub trait AuthServiceTrait { async fn extract_user_info( &self, - ctx: &AppContext, + ctx: &dyn AppContextTrait, request: &mut Parts, ) -> Result; fn www_authenticate_header_value(&self) -> Option; @@ -104,7 +104,7 @@ impl AuthService { impl AuthServiceTrait for AuthService { async fn extract_user_info( &self, - ctx: &AppContext, + ctx: &dyn AppContextTrait, request: &mut Parts, ) -> Result { match self { diff --git a/apps/recorder/src/errors/mod.rs b/apps/recorder/src/errors/mod.rs index a4f4bd4..b19cec0 100644 --- a/apps/recorder/src/errors/mod.rs +++ b/apps/recorder/src/errors/mod.rs @@ -2,6 +2,7 @@ use std::{borrow::Cow, error::Error as StdError}; use axum::response::{IntoResponse, Response}; use http::StatusCode; +use serde::{Deserialize, Deserializer, Serialize}; use thiserror::Error as ThisError; use crate::{auth::AuthError, fetch::HttpClientError}; @@ -113,4 +114,23 @@ impl IntoResponse for RError { } } +impl Serialize for RError { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(&self.to_string()) + } +} + +impl<'de> Deserialize<'de> for RError { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + Ok(Self::CustomMessageString(s)) + } +} + pub type RResult = Result; diff --git a/apps/recorder/src/extract/mikan/client.rs b/apps/recorder/src/extract/mikan/client.rs index 4da3dd6..52d83d1 100644 --- a/apps/recorder/src/extract/mikan/client.rs +++ b/apps/recorder/src/extract/mikan/client.rs @@ -1,7 +1,7 @@ -use std::ops::Deref; +use std::{fmt::Debug, ops::Deref}; use reqwest_middleware::ClientWithMiddleware; -use secrecy::{ExposeSecret, SecretString}; +use serde::{Deserialize, Serialize}; use url::Url; use super::MikanConfig; @@ -10,15 +10,24 @@ use crate::{ fetch::{HttpClient, HttpClientTrait, client::HttpClientCookiesAuth}, }; -#[derive(Debug, Default, Clone)] +#[derive(Default, Clone, Deserialize, Serialize)] pub struct MikanAuthSecrecy { - pub cookie: SecretString, + pub cookie: String, pub user_agent: Option, } +impl Debug for MikanAuthSecrecy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MikanAuthSecrecy") + .field("cookie", &String::from("[secrecy]")) + .field("user_agent", &String::from("[secrecy]")) + .finish() + } +} + impl MikanAuthSecrecy { pub fn into_cookie_auth(self, url: &Url) -> Result { - HttpClientCookiesAuth::from_cookies(self.cookie.expose_secret(), url, self.user_agent) + HttpClientCookiesAuth::from_cookies(&self.cookie, url, self.user_agent) } } @@ -38,9 +47,13 @@ impl MikanClient { }) } - pub fn fork_with_auth(&self, secrecy: MikanAuthSecrecy) -> Result { - let cookie_auth = secrecy.into_cookie_auth(&self.base_url)?; - let fork = self.http_client.fork().attach_secrecy(cookie_auth); + pub fn fork_with_auth(&self, secrecy: Option) -> Result { + let mut fork = self.http_client.fork(); + + if let Some(secrecy) = secrecy { + let cookie_auth = secrecy.into_cookie_auth(&self.base_url)?; + fork = fork.attach_secrecy(cookie_auth); + } Ok(Self { http_client: HttpClient::from_fork(fork)?, diff --git a/apps/recorder/src/extract/mikan/web_extract.rs b/apps/recorder/src/extract/mikan/web_extract.rs index c0709a6..e7e3fd4 100644 --- a/apps/recorder/src/extract/mikan/web_extract.rs +++ b/apps/recorder/src/extract/mikan/web_extract.rs @@ -1,18 +1,20 @@ -use std::borrow::Cow; +use std::{borrow::Cow, sync::Arc}; use async_stream::try_stream; use bytes::Bytes; use futures::Stream; use itertools::Itertools; use scraper::{Html, Selector}; +use serde::{Deserialize, Serialize}; use tracing::instrument; use url::Url; use super::{ - MIKAN_BUCKET_KEY, MikanBangumiRssLink, MikanClient, extract_mikan_bangumi_id_from_rss_link, + MIKAN_BUCKET_KEY, MikanAuthSecrecy, MikanBangumiRssLink, MikanClient, + extract_mikan_bangumi_id_from_rss_link, }; use crate::{ - app::AppContext, + app::AppContextTrait, errors::{RError, RResult}, extract::{ html::{extract_background_image_src_from_style_attr, extract_inner_text_from_element_ref}, @@ -20,6 +22,7 @@ use crate::{ }, fetch::{html::fetch_html, image::fetch_image}, storage::StorageContentCategory, + tasks::core::{StandardStreamTaskReplayLayout, StreamTaskRunnerTrait}, }; #[derive(Clone, Debug, PartialEq)] @@ -34,7 +37,7 @@ pub struct MikanEpisodeMeta { pub mikan_episode_id: String, } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct MikanBangumiMeta { pub homepage: Url, pub origin_poster_src: Option, @@ -123,12 +126,12 @@ pub async fn extract_mikan_poster_meta_from_src( } pub async fn extract_mikan_bangumi_poster_meta_from_src_with_cache( - ctx: &AppContext, + ctx: &dyn AppContextTrait, origin_poster_src_url: Url, subscriber_id: i32, ) -> RResult { - let dal_client = &ctx.storage; - let mikan_client = &ctx.mikan; + let dal_client = ctx.storage(); + let mikan_client = ctx.mikan(); if let Some(poster_src) = dal_client .exists_object( StorageContentCategory::Image, @@ -346,126 +349,141 @@ pub async fn extract_mikan_bangumi_meta_from_bangumi_homepage( }) } -/** - * @logined-required - */ -#[instrument(skip_all, fields(my_bangumi_page_url = my_bangumi_page_url.as_str()))] +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ExtractMikanBangumisMetaFromMyBangumiRequest { + pub my_bangumi_page_url: Url, + pub auth_securcy: Option, +} + +pub type ExtractMikanBangumisMetaFromMyBangumiTask = + StandardStreamTaskReplayLayout; + +#[instrument(skip_all, fields(my_bangumi_page_url, auth_securcy = ?auth_securcy, history = history.len()))] pub fn extract_mikan_bangumis_meta_from_my_bangumi_page( - http_client: &MikanClient, + context: Arc, my_bangumi_page_url: Url, -) -> impl Stream> { + auth_securcy: Option, + history: &[Arc>], +) -> impl Stream> { try_stream! { + let http_client = &context.mikan().fork_with_auth(auth_securcy.clone())?; + let mikan_base_url = Url::parse(&my_bangumi_page_url.origin().unicode_serialization())?; let content = fetch_html(http_client, my_bangumi_page_url.clone()).await?; - let bangumi_container_selector = &Selector::parse(".sk-bangumi .an-ul>li").unwrap(); - let bangumi_info_selector = &Selector::parse(".an-info a.an-text").unwrap(); - let bangumi_poster_selector = - &Selector::parse("span[data-src][data-bangumiid], span[data-bangumiid][style]") - .unwrap(); let fansub_container_selector = - &Selector::parse(".js-expand_bangumi-subgroup.js-subscribed").unwrap(); + &Selector::parse(".js-expand_bangumi-subgroup.js-subscribed").unwrap(); let fansub_title_selector = &Selector::parse(".tag-res-name[title]").unwrap(); let fansub_id_selector = &Selector::parse(".active[data-subtitlegroupid][data-bangumiid]").unwrap(); - let bangumi_iters = { + let bangumi_items = { let html = Html::parse_document(&content); + let bangumi_container_selector = &Selector::parse(".sk-bangumi .an-ul>li").unwrap(); + let bangumi_info_selector = &Selector::parse(".an-info a.an-text").unwrap(); + let bangumi_poster_selector = + &Selector::parse("span[data-src][data-bangumiid], span[data-bangumiid][style]") + .unwrap(); html.select(bangumi_container_selector) - .filter_map(|bangumi_elem| { - let title_and_href_elem = bangumi_elem.select(bangumi_info_selector).next(); - let poster_elem = bangumi_elem.select(bangumi_poster_selector).next(); - if let (Some(bangumi_home_page_url), Some(bangumi_title)) = ( - title_and_href_elem.and_then(|elem| elem.attr("href")), - title_and_href_elem.and_then(|elem| elem.attr("title")), - ) { - let origin_poster_src = poster_elem.and_then(|ele| { - ele.attr("data-src") - .and_then(|data_src| { - extract_image_src_from_str(data_src, &mikan_base_url) + .filter_map(|bangumi_elem| { + let title_and_href_elem = + bangumi_elem.select(bangumi_info_selector).next(); + let poster_elem = bangumi_elem.select(bangumi_poster_selector).next(); + if let (Some(bangumi_home_page_url), Some(bangumi_title)) = ( + title_and_href_elem.and_then(|elem| elem.attr("href")), + title_and_href_elem.and_then(|elem| elem.attr("title")), + ) { + let origin_poster_src = poster_elem.and_then(|ele| { + ele.attr("data-src") + .and_then(|data_src| { + extract_image_src_from_str(data_src, &mikan_base_url) + }) + .or_else(|| { + ele.attr("style").and_then(|style| { + extract_background_image_src_from_style_attr( + style, + &mikan_base_url, + ) }) - .or_else(|| { - ele.attr("style").and_then(|style| { - extract_background_image_src_from_style_attr( - style, - &mikan_base_url, - ) - }) - }) - }); - let bangumi_title = bangumi_title.to_string(); - let bangumi_home_page_url = - my_bangumi_page_url.join(bangumi_home_page_url).ok()?; - let MikanBangumiHomepage { - mikan_bangumi_id, .. - } = extract_mikan_bangumi_id_from_homepage(&bangumi_home_page_url)?; - if let Some(origin_poster_src) = origin_poster_src.as_ref() { - tracing::trace!( - origin_poster_src = origin_poster_src.as_str(), - bangumi_title, - mikan_bangumi_id, - "bangumi info extracted" - ); - } else { - tracing::warn!( - bangumi_title, - mikan_bangumi_id, - "bangumi info extracted, but failed to extract poster_src" - ); - } - let bangumi_expand_info_url = build_mikan_bangumi_expand_info_url( - mikan_base_url.clone(), - &mikan_bangumi_id, - ); - Some(( + }) + }); + let bangumi_title = bangumi_title.to_string(); + let bangumi_home_page_url = + my_bangumi_page_url.join(bangumi_home_page_url).ok()?; + let MikanBangumiHomepage { + mikan_bangumi_id, .. + } = extract_mikan_bangumi_id_from_homepage(&bangumi_home_page_url)?; + if let Some(origin_poster_src) = origin_poster_src.as_ref() { + tracing::trace!( + origin_poster_src = origin_poster_src.as_str(), bangumi_title, mikan_bangumi_id, - bangumi_expand_info_url, - origin_poster_src, - )) + "bangumi info extracted" + ); } else { - None + tracing::warn!( + bangumi_title, + mikan_bangumi_id, + "bangumi info extracted, but failed to extract poster_src" + ); } - }) - .collect_vec() - }; - - for (bangumi_title, mikan_bangumi_id, bangumi_expand_info_url, origin_poster_src) in - bangumi_iters - { - if let Some((fansub_name, mikan_fansub_id)) = { - let bangumi_expand_info_content = fetch_html(http_client, bangumi_expand_info_url).await?; - let bangumi_expand_info_fragment = Html::parse_fragment(&bangumi_expand_info_content); - bangumi_expand_info_fragment.select(fansub_container_selector).next().and_then(|fansub_info| { - if let (Some(fansub_name), Some(mikan_fansub_id)) = ( - fansub_info - .select(fansub_title_selector) - .next() - .and_then(|ele| ele.attr("title")) - .map(String::from), - fansub_info - .select(fansub_id_selector) - .next() - .and_then(|ele| ele.attr("data-subtitlegroupid")) - .map(String::from) - ) { - Some((fansub_name, mikan_fansub_id)) - } else { - None - } - }) - } { - tracing::trace!( - fansub_name, - mikan_fansub_id, - "subscribed fansub extracted" - ); - yield MikanBangumiMeta { - homepage: build_mikan_bangumi_homepage( + let bangumi_expand_info_url = build_mikan_bangumi_expand_info_url( mikan_base_url.clone(), &mikan_bangumi_id, + ); + Some(( + bangumi_title, + mikan_bangumi_id, + bangumi_expand_info_url, + origin_poster_src, + )) + } else { + None + } + }) + .collect_vec() + }; + + for (idx, (bangumi_title, mikan_bangumi_id, bangumi_expand_info_url, origin_poster_src)) in + bangumi_items.iter().enumerate() + { + + if history.get(idx).is_some() { + continue; + } else if let Some((fansub_name, mikan_fansub_id)) = { + let bangumi_expand_info_content = + fetch_html(http_client, bangumi_expand_info_url.clone()).await?; + let bangumi_expand_info_fragment = + Html::parse_fragment(&bangumi_expand_info_content); + bangumi_expand_info_fragment + .select(fansub_container_selector) + .next() + .and_then(|fansub_info| { + if let (Some(fansub_name), Some(mikan_fansub_id)) = ( + fansub_info + .select(fansub_title_selector) + .next() + .and_then(|ele| ele.attr("title")) + .map(String::from), + fansub_info + .select(fansub_id_selector) + .next() + .and_then(|ele| ele.attr("data-subtitlegroupid")) + .map(String::from), + ) { + Some((fansub_name, mikan_fansub_id)) + } else { + None + } + }) + } { + tracing::trace!(fansub_name, mikan_fansub_id, "subscribed fansub extracted"); + let item = MikanBangumiMeta { + homepage: build_mikan_bangumi_homepage( + mikan_base_url.clone(), + mikan_bangumi_id, Some(&mikan_fansub_id), ), bangumi_title: bangumi_title.to_string(), @@ -474,11 +492,28 @@ pub fn extract_mikan_bangumis_meta_from_my_bangumi_page( fansub: Some(fansub_name), origin_poster_src: origin_poster_src.clone(), }; + yield item; } } } } +impl StreamTaskRunnerTrait for ExtractMikanBangumisMetaFromMyBangumiTask { + fn run( + context: Arc, + request: &Self::Request, + history: &[Arc>], + ) -> impl Stream> { + let context = context.clone(); + extract_mikan_bangumis_meta_from_my_bangumi_page( + context, + request.my_bangumi_page_url.clone(), + request.auth_securcy.clone(), + history, + ) + } +} + #[cfg(test)] mod test { #![allow(unused_variables)] @@ -486,22 +521,19 @@ mod test { use futures::{TryStreamExt, pin_mut}; use http::header; use rstest::{fixture, rstest}; - use secrecy::SecretString; use tracing::Level; use url::Url; use zune_image::{codecs::ImageFormat, image::Image}; use super::*; - use crate::{ - extract::mikan::{ - MikanAuthSecrecy, web_extract::extract_mikan_bangumis_meta_from_my_bangumi_page, - }, - test_utils::{mikan::build_testing_mikan_client, tracing::init_testing_tracing}, + use crate::test_utils::{ + app::UnitTestAppContext, mikan::build_testing_mikan_client, + tracing::try_init_testing_tracing, }; #[fixture] fn before_each() { - init_testing_tracing(Level::INFO); + try_init_testing_tracing(Level::INFO); } #[rstest] @@ -625,7 +657,11 @@ mod test { let my_bangumi_page_url = mikan_base_url.join("/Home/MyBangumi")?; - let mikan_client = build_testing_mikan_client(mikan_base_url.clone()).await?; + let context = Arc::new( + UnitTestAppContext::builder() + .mikan(build_testing_mikan_client(mikan_base_url.clone()).await?) + .build(), + ); { let my_bangumi_without_cookie_mock = mikan_server @@ -636,8 +672,10 @@ mod test { .await; let bangumi_metas = extract_mikan_bangumis_meta_from_my_bangumi_page( - &mikan_client, + context.clone(), my_bangumi_page_url.clone(), + None, + &[], ); pin_mut!(bangumi_metas); @@ -671,8 +709,8 @@ mod test { .create_async() .await; - let mikan_client_with_cookie = mikan_client.fork_with_auth(MikanAuthSecrecy { - cookie: SecretString::from( + let auth_secrecy = Some(MikanAuthSecrecy { + cookie: String::from( "mikan-announcement=1; .AspNetCore.Antiforgery.abc=abc; \ .AspNetCore.Identity.Application=abc; ", ), @@ -680,11 +718,13 @@ mod test { "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like \ Gecko) Chrome/133.0.0.0 Safari/537.36 Edg/133.0.0.0", )), - })?; + }); let bangumi_metas = extract_mikan_bangumis_meta_from_my_bangumi_page( - &mikan_client_with_cookie, + context.clone(), my_bangumi_page_url, + auth_secrecy, + &[], ); pin_mut!(bangumi_metas); let bangumi_metas = bangumi_metas.try_collect::>().await?; diff --git a/apps/recorder/src/fetch/client/secrecy.rs b/apps/recorder/src/fetch/client/secrecy.rs index 7e4db74..2df2606 100644 --- a/apps/recorder/src/fetch/client/secrecy.rs +++ b/apps/recorder/src/fetch/client/secrecy.rs @@ -2,12 +2,11 @@ use std::sync::Arc; use cookie::Cookie; use reqwest::{ClientBuilder, cookie::Jar}; -use secrecy::zeroize::Zeroize; use url::Url; use crate::errors::RError; -pub trait HttpClientSecrecyDataTrait: Zeroize { +pub trait HttpClientSecrecyDataTrait { fn attach_secrecy_to_client(&self, client_builder: ClientBuilder) -> ClientBuilder { client_builder } @@ -37,13 +36,6 @@ impl HttpClientCookiesAuth { } } -impl Zeroize for HttpClientCookiesAuth { - fn zeroize(&mut self) { - self.cookie_jar = Arc::new(Jar::default()); - self.user_agent = None; - } -} - impl HttpClientSecrecyDataTrait for HttpClientCookiesAuth { fn attach_secrecy_to_client(&self, client_builder: ClientBuilder) -> ClientBuilder { let mut client_builder = client_builder.cookie_provider(self.cookie_jar.clone()); diff --git a/apps/recorder/src/graphql/mod.rs b/apps/recorder/src/graphql/mod.rs index a8c497c..5b07576 100644 --- a/apps/recorder/src/graphql/mod.rs +++ b/apps/recorder/src/graphql/mod.rs @@ -3,6 +3,7 @@ pub mod filter; pub mod guard; pub mod schema_root; pub mod service; +pub mod subscriptions; pub mod util; pub use config::GraphQLConfig; diff --git a/apps/recorder/src/graphql/subscriptions/mod.rs b/apps/recorder/src/graphql/subscriptions/mod.rs new file mode 100644 index 0000000..e69de29 diff --git a/apps/recorder/src/models/auth.rs b/apps/recorder/src/models/auth.rs index b343d45..3952160 100644 --- a/apps/recorder/src/models/auth.rs +++ b/apps/recorder/src/models/auth.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use super::subscribers::{self, SEED_SUBSCRIBER}; use crate::{ - app::AppContext, + app::AppContextTrait, errors::{RError, RResult}, }; @@ -57,8 +57,8 @@ impl Related for Entity { impl ActiveModelBehavior for ActiveModel {} impl Model { - pub async fn find_by_pid(ctx: &AppContext, pid: &str) -> RResult { - let db = &ctx.db; + pub async fn find_by_pid(ctx: &dyn AppContextTrait, pid: &str) -> RResult { + let db = ctx.db(); let subscriber_auth = Entity::find() .filter(Column::Pid.eq(pid)) .one(db) @@ -67,8 +67,8 @@ impl Model { Ok(subscriber_auth) } - pub async fn create_from_oidc(ctx: &AppContext, sub: String) -> RResult { - let db = &ctx.db; + pub async fn create_from_oidc(ctx: &dyn AppContextTrait, sub: String) -> RResult { + let db = ctx.db(); let txn = db.begin().await?; diff --git a/apps/recorder/src/models/bangumi.rs b/apps/recorder/src/models/bangumi.rs index 04dccb9..cf910c4 100644 --- a/apps/recorder/src/models/bangumi.rs +++ b/apps/recorder/src/models/bangumi.rs @@ -4,7 +4,7 @@ use sea_orm::{ActiveValue, FromJsonQueryResult, entity::prelude::*, sea_query::O use serde::{Deserialize, Serialize}; use super::subscription_bangumi; -use crate::{app::AppContext, errors::RResult}; +use crate::{app::AppContextTrait, errors::RResult}; #[derive( Clone, Debug, PartialEq, Eq, Serialize, Deserialize, FromJsonQueryResult, SimpleObject, @@ -113,7 +113,7 @@ pub enum RelatedEntity { impl Model { pub async fn get_or_insert_from_mikan( - ctx: &AppContext, + ctx: &dyn AppContextTrait, subscriber_id: i32, subscription_id: i32, mikan_bangumi_id: String, @@ -123,7 +123,7 @@ impl Model { where F: AsyncFnOnce(&mut ActiveModel) -> RResult<()>, { - let db = &ctx.db; + let db = ctx.db(); if let Some(existed) = Entity::find() .filter( Column::MikanBangumiId diff --git a/apps/recorder/src/models/episodes.rs b/apps/recorder/src/models/episodes.rs index e220229..89e830f 100644 --- a/apps/recorder/src/models/episodes.rs +++ b/apps/recorder/src/models/episodes.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; use super::{bangumi, query::InsertManyReturningExt, subscription_episode}; use crate::{ - app::AppContext, + app::AppContextTrait, errors::RResult, extract::{ mikan::{MikanEpisodeMeta, build_mikan_episode_homepage}, @@ -136,12 +136,12 @@ pub struct MikanEpsiodeCreation { impl Model { pub async fn add_episodes( - ctx: &AppContext, + ctx: &dyn AppContextTrait, subscriber_id: i32, subscription_id: i32, creations: impl IntoIterator, ) -> RResult<()> { - let db = &ctx.db; + let db = ctx.db(); let new_episode_active_modes = creations .into_iter() .map(|cr| ActiveModel::from_mikan_episode_meta(ctx, cr)) @@ -189,7 +189,7 @@ impl Model { impl ActiveModel { pub fn from_mikan_episode_meta( - ctx: &AppContext, + ctx: &dyn AppContextTrait, creation: MikanEpsiodeCreation, ) -> color_eyre::eyre::Result { let item = creation.episode; @@ -201,7 +201,7 @@ impl ActiveModel { .ok() .unwrap_or_default(); let homepage = - build_mikan_episode_homepage(ctx.mikan.base_url().clone(), &item.mikan_episode_id); + build_mikan_episode_homepage(ctx.mikan().base_url().clone(), &item.mikan_episode_id); Ok(Self { mikan_episode_id: ActiveValue::Set(Some(item.mikan_episode_id)), diff --git a/apps/recorder/src/models/mod.rs b/apps/recorder/src/models/mod.rs index 9f98f5d..788c98f 100644 --- a/apps/recorder/src/models/mod.rs +++ b/apps/recorder/src/models/mod.rs @@ -8,3 +8,5 @@ pub mod subscribers; pub mod subscription_bangumi; pub mod subscription_episode; pub mod subscriptions; +pub mod task_stream_item; +pub mod tasks; diff --git a/apps/recorder/src/models/subscribers.rs b/apps/recorder/src/models/subscribers.rs index 2ecec75..658f5cc 100644 --- a/apps/recorder/src/models/subscribers.rs +++ b/apps/recorder/src/models/subscribers.rs @@ -4,7 +4,7 @@ use sea_orm::{ActiveValue, FromJsonQueryResult, TransactionTrait, entity::prelud use serde::{Deserialize, Serialize}; use crate::{ - app::AppContext, + app::AppContextTrait, errors::{RError, RResult}, }; @@ -95,13 +95,13 @@ pub struct SubscriberIdParams { impl ActiveModelBehavior for ActiveModel {} impl Model { - pub async fn find_seed_subscriber_id(ctx: &AppContext) -> RResult { + pub async fn find_seed_subscriber_id(ctx: &dyn AppContextTrait) -> RResult { let subscriber_auth = crate::models::auth::Model::find_by_pid(ctx, SEED_SUBSCRIBER).await?; Ok(subscriber_auth.subscriber_id) } - pub async fn find_by_id(ctx: &AppContext, id: i32) -> RResult { - let db = &ctx.db; + pub async fn find_by_id(ctx: &dyn AppContextTrait, id: i32) -> RResult { + let db = ctx.db(); let subscriber = Entity::find_by_id(id) .one(db) @@ -110,8 +110,8 @@ impl Model { Ok(subscriber) } - pub async fn create_root(ctx: &AppContext) -> RResult { - let db = &ctx.db; + pub async fn create_root(ctx: &dyn AppContextTrait) -> RResult { + let db = ctx.db(); let txn = db.begin().await?; let user = ActiveModel { diff --git a/apps/recorder/src/models/subscriptions.rs b/apps/recorder/src/models/subscriptions.rs index 65b5d6a..7033140 100644 --- a/apps/recorder/src/models/subscriptions.rs +++ b/apps/recorder/src/models/subscriptions.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use super::{bangumi, episodes, query::filter_values_in}; use crate::{ - app::AppContext, + app::AppContextTrait, errors::RResult, extract::{ mikan::{ @@ -179,22 +179,22 @@ impl ActiveModel { impl Model { pub async fn add_subscription( - ctx: &AppContext, + ctx: &dyn AppContextTrait, create_dto: SubscriptionCreateDto, subscriber_id: i32, ) -> RResult { - let db = &ctx.db; + let db = ctx.db(); let subscription = ActiveModel::from_create_dto(create_dto, subscriber_id); Ok(subscription.insert(db).await?) } pub async fn toggle_with_ids( - ctx: &AppContext, + ctx: &dyn AppContextTrait, ids: impl Iterator, enabled: bool, ) -> RResult<()> { - let db = &ctx.db; + let db = ctx.db(); Entity::update_many() .col_expr(Column::Enabled, Expr::value(enabled)) .filter(Column::Id.is_in(ids)) @@ -203,8 +203,11 @@ impl Model { Ok(()) } - pub async fn delete_with_ids(ctx: &AppContext, ids: impl Iterator) -> RResult<()> { - let db = &ctx.db; + pub async fn delete_with_ids( + ctx: &dyn AppContextTrait, + ids: impl Iterator, + ) -> RResult<()> { + let db = ctx.db(); Entity::delete_many() .filter(Column::Id.is_in(ids)) .exec(db) @@ -212,16 +215,16 @@ impl Model { Ok(()) } - pub async fn pull_subscription(&self, ctx: &AppContext) -> RResult<()> { + pub async fn pull_subscription(&self, ctx: &dyn AppContextTrait) -> RResult<()> { match &self.category { SubscriptionCategory::Mikan => { - let mikan_client = &ctx.mikan; + let mikan_client = ctx.mikan(); let channel = extract_mikan_rss_channel_from_rss_link(mikan_client, &self.source_url).await?; let items = channel.into_items(); - let db = &ctx.db; + let db = ctx.db(); let items = items.into_iter().collect_vec(); let mut stmt = filter_values_in( @@ -266,7 +269,7 @@ impl Model { for ((mikan_bangumi_id, mikan_fansub_id), new_ep_metas) in new_mikan_bangumi_groups { - let mikan_base_url = ctx.mikan.base_url(); + let mikan_base_url = ctx.mikan().base_url(); let bgm_homepage = build_mikan_bangumi_homepage( mikan_base_url.clone(), &mikan_bangumi_id, diff --git a/apps/recorder/src/models/task_stream_item.rs b/apps/recorder/src/models/task_stream_item.rs new file mode 100644 index 0000000..648882d --- /dev/null +++ b/apps/recorder/src/models/task_stream_item.rs @@ -0,0 +1,62 @@ +use async_trait::async_trait; +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive( + Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize, +)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "task_status")] +#[serde(rename_all = "snake_case")] +pub enum TaskStatus { + #[sea_orm(string_value = "r")] + Running, + #[sea_orm(string_value = "s")] + Success, + #[sea_orm(string_value = "f")] + Failed, +} + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] +#[sea_orm(table_name = "tasks")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i32, + pub task_id: i32, + pub subscriber_id: i32, + pub item: serde_json::Value, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::subscribers::Entity", + from = "Column::SubscriberId", + to = "super::subscribers::Column::Id", + on_update = "Cascade", + on_delete = "Cascade" + )] + Subscriber, + #[sea_orm( + belongs_to = "super::tasks::Entity", + from = "Column::TaskId", + to = "super::tasks::Column::Id", + on_update = "Cascade", + on_delete = "Cascade" + )] + Task, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Subscriber.def() + } +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Task.def() + } +} + +#[async_trait] +impl ActiveModelBehavior for ActiveModel {} diff --git a/apps/recorder/src/models/tasks.rs b/apps/recorder/src/models/tasks.rs new file mode 100644 index 0000000..bd62d45 --- /dev/null +++ b/apps/recorder/src/models/tasks.rs @@ -0,0 +1,95 @@ +use async_trait::async_trait; +use sea_orm::{QuerySelect, entity::prelude::*}; +use serde::{Deserialize, Serialize}; + +use crate::{app::AppContextTrait, errors::RResult}; + +#[derive( + Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize, +)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "task_status")] +#[serde(rename_all = "snake_case")] +pub enum TaskStatus { + #[sea_orm(string_value = "p")] + Pending, + #[sea_orm(string_value = "r")] + Running, + #[sea_orm(string_value = "s")] + Success, + #[sea_orm(string_value = "f")] + Failed, +} + +#[derive( + Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize, +)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "task_status")] +#[serde(rename_all = "snake_case")] +pub enum TaskMode { + #[sea_orm(string_value = "stream")] + Stream, + #[sea_orm(string_value = "future")] + Future, +} + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] +#[sea_orm(table_name = "tasks")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i32, + pub subscriber_id: i32, + pub task_mode: TaskMode, + pub task_status: TaskStatus, + pub task_type: String, + pub state_data: serde_json::Value, + pub request_data: serde_json::Value, + pub error_data: serde_json::Value, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm(has_many = "super::task_stream_item::Entity")] + StreamItem, + #[sea_orm( + belongs_to = "super::subscribers::Entity", + from = "Column::SubscriberId", + to = "super::subscribers::Column::Id", + on_update = "Cascade", + on_delete = "Cascade" + )] + Subscriber, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Subscriber.def() + } +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::StreamItem.def() + } +} + +impl Model { + pub async fn find_stream_task_by_id( + ctx: &dyn AppContextTrait, + task_id: i32, + ) -> RResult)>> { + let db = ctx.db(); + let res = Entity::find() + .filter(Column::Id.eq(task_id)) + .filter(Column::TaskMode.eq(TaskMode::Stream)) + .find_with_related(super::task_stream_item::Entity) + .limit(1) + .all(db) + .await? + .pop(); + + Ok(res) + } +} + +#[async_trait] +impl ActiveModelBehavior for ActiveModel {} diff --git a/apps/recorder/src/tasks/core.rs b/apps/recorder/src/tasks/core.rs index 4f5ddcd..0c25694 100644 --- a/apps/recorder/src/tasks/core.rs +++ b/apps/recorder/src/tasks/core.rs @@ -1,16 +1,277 @@ -use std::borrow::Cow; +use std::{borrow::Cow, sync::Arc}; -use async_trait::async_trait; +use async_stream::stream; +use futures::{Stream, StreamExt, pin_mut}; +use serde::{Serialize, de::DeserializeOwned}; +use tokio::sync::{RwLock, mpsc}; -use crate::{app::AppContext, errors::RResult}; +use crate::{ + app::AppContextTrait, + errors::{RError, RResult}, + models, +}; -pub struct TaskVars {} - -#[async_trait] -pub trait Task: Send + Sync { - fn task_name() -> Cow<'static, str>; - - fn task_id(&self) -> &str; - - async fn run(&self, app_context: &AppContext, vars: &TaskVars) -> RResult<()>; +pub struct TaskMeta { + pub subscriber_id: i32, + pub task_id: i32, + pub task_kind: Cow<'static, str>, +} + +pub struct ReplayChannel { + sender: mpsc::UnboundedSender, + channels: Arc>>>, + buffer: Arc>>, +} + +impl ReplayChannel { + pub fn new(history: Vec) -> Self { + let (tx, mut rx) = mpsc::unbounded_channel::(); + let channels = Arc::new(RwLock::new(Vec::>::new())); + let buffer = Arc::new(RwLock::new(history)); + { + let channels = channels.clone(); + let buffer = buffer.clone(); + tokio::spawn(async move { + loop { + match rx.recv().await { + Some(value) => { + let mut w = buffer.write().await; + let senders = channels.read().await; + for s in senders.iter() { + if !s.is_closed() { + if let Err(err) = s.send(value.clone()) { + tracing::error!(err = %err, "replay-channel broadcast to other subscribers error"); + } + } + } + w.push(value); + } + None => { + drop(rx); + let mut cs = channels.write().await; + cs.clear(); + break; + } + } + } + }); + } + + Self { + sender: tx, + channels, + buffer, + } + } + + pub fn sender(&self) -> &mpsc::UnboundedSender { + &self.sender + } + + pub async fn receiver(&self) -> mpsc::UnboundedReceiver { + let (tx, rx) = mpsc::unbounded_channel(); + let items = self.buffer.read().await; + for item in items.iter() { + if let Err(err) = tx.send(item.clone()) { + tracing::error!(err = %err, "replay-channel send replay value to other subscribers error"); + } + } + if !self.sender.is_closed() { + let mut sw = self.channels.write().await; + sw.push(tx); + } + rx + } + + pub async fn close(&self) { + let mut senders = self.channels.write().await; + senders.clear(); + } +} + +pub trait StreamTaskCoreTrait: Sized { + type Request: Serialize + DeserializeOwned; + type Item: Serialize + DeserializeOwned; + + fn task_id(&self) -> i32; + + fn task_kind(&self) -> &str; + + fn new(meta: TaskMeta, request: Self::Request) -> Self; + + fn request(&self) -> &Self::Request; +} + +pub trait StreamTaskReplayLayoutTrait: StreamTaskCoreTrait { + fn history(&self) -> &[Arc>]; + + fn resume_from_model( + task: models::tasks::Model, + stream_items: Vec, + ) -> RResult; + + fn running_receiver( + &self, + ) -> impl Future>>>>; + + #[allow(clippy::type_complexity)] + fn init_receiver( + &self, + ) -> impl Future< + Output = ( + mpsc::UnboundedSender>>, + mpsc::UnboundedReceiver>>, + ), + >; + + fn serialize_request(request: Self::Request) -> RResult { + serde_json::to_value(request).map_err(RError::from) + } + + fn serialize_item(item: RResult) -> RResult { + serde_json::to_value(item).map_err(RError::from) + } + + fn deserialize_request(request: serde_json::Value) -> RResult { + serde_json::from_value(request).map_err(RError::from) + } + + fn deserialize_item(item: serde_json::Value) -> RResult> { + serde_json::from_value(item).map_err(RError::from) + } +} + +pub trait StreamTaskRunnerTrait: StreamTaskCoreTrait { + fn run( + context: Arc, + request: &Self::Request, + history: &[Arc>], + ) -> impl Stream>; +} + +pub trait StreamTaskReplayRunnerTrait: StreamTaskRunnerTrait + StreamTaskReplayLayoutTrait { + fn run_shared( + &self, + context: Arc, + ) -> impl Stream>> { + stream! { + if let Some(mut receiver) = self.running_receiver().await { + while let Some(item) = receiver.recv().await { + yield item + } + } else { + let (tx, _) = self.init_receiver().await; + let stream = Self::run(context, self.request(), self.history()); + + pin_mut!(stream); + + while let Some(item) = stream.next().await { + let item = Arc::new(item); + if let Err(err) = tx.send(item.clone()) { + tracing::error!(task_id = self.task_id(), task_kind = self.task_kind(), err = %err, "run shared send error"); + } + yield item + } + }; + + } + } +} + +pub struct StandardStreamTaskReplayLayout +where + Request: Serialize + DeserializeOwned, + Item: Serialize + DeserializeOwned + Sync + Send + 'static, +{ + pub meta: TaskMeta, + pub request: Request, + pub history: Vec>>, + #[allow(clippy::type_complexity)] + pub channel: Arc>>>>>, +} + +impl StreamTaskCoreTrait for StandardStreamTaskReplayLayout +where + Request: Serialize + DeserializeOwned, + Item: Serialize + DeserializeOwned + Sync + Send + 'static, +{ + type Request = Request; + type Item = Item; + + fn task_id(&self) -> i32 { + self.meta.task_id + } + + fn request(&self) -> &Self::Request { + &self.request + } + + fn task_kind(&self) -> &str { + &self.meta.task_kind + } + + fn new(meta: TaskMeta, request: Self::Request) -> Self { + Self { + meta, + request, + history: vec![], + channel: Arc::new(RwLock::new(None)), + } + } +} + +impl StreamTaskReplayLayoutTrait for StandardStreamTaskReplayLayout +where + Request: Serialize + DeserializeOwned, + Item: Serialize + DeserializeOwned + Sync + Send + 'static, +{ + fn history(&self) -> &[Arc>] { + &self.history + } + + fn resume_from_model( + task: models::tasks::Model, + stream_items: Vec, + ) -> RResult { + Ok(Self { + meta: TaskMeta { + task_id: task.id, + subscriber_id: task.subscriber_id, + task_kind: Cow::Owned(task.task_type), + }, + request: Self::deserialize_request(task.request_data)?, + history: stream_items + .into_iter() + .map(|m| Self::deserialize_item(m.item).map(Arc::new)) + .collect::>>()?, + channel: Arc::new(RwLock::new(None)), + }) + } + + async fn running_receiver(&self) -> Option>>> { + if let Some(channel) = self.channel.read().await.as_ref() { + Some(channel.receiver().await) + } else { + None + } + } + + async fn init_receiver( + &self, + ) -> ( + mpsc::UnboundedSender>>, + mpsc::UnboundedReceiver>>, + ) { + let channel = ReplayChannel::new(self.history.clone()); + let rx = channel.receiver().await; + let sender = channel.sender().clone(); + + { + { + let mut w = self.channel.write().await; + *w = Some(channel); + } + } + (sender, rx) + } } diff --git a/apps/recorder/src/tasks/create_mikan_bangumi_subscriptions_from_my_bangumi_page.rs b/apps/recorder/src/tasks/create_mikan_bangumi_subscriptions_from_my_bangumi_page.rs deleted file mode 100644 index 9c2dea8..0000000 --- a/apps/recorder/src/tasks/create_mikan_bangumi_subscriptions_from_my_bangumi_page.rs +++ /dev/null @@ -1,49 +0,0 @@ -use std::borrow::Cow; - -use futures::{TryStreamExt, pin_mut}; - -use super::core::{Task, TaskVars}; -use crate::{ - app::AppContext, - errors::RResult, - extract::mikan::{ - MikanAuthSecrecy, web_extract::extract_mikan_bangumis_meta_from_my_bangumi_page, - }, -}; - -#[derive(Debug)] -pub struct CreateMikanRSSFromMyBangumiTask { - pub subscriber_id: i32, - pub task_id: String, - pub auth_secrecy: MikanAuthSecrecy, -} - -#[async_trait::async_trait] -impl Task for CreateMikanRSSFromMyBangumiTask { - fn task_name() -> Cow<'static, str> { - Cow::Borrowed("create-mikan-rss-from-my-bangumi") - } - - fn task_id(&self) -> &str { - &self.task_id - } - - async fn run(&self, app_context: &AppContext, _vars: &TaskVars) -> RResult<()> { - let mikan_client = app_context - .mikan - .fork_with_auth(self.auth_secrecy.clone())?; - - { - let bangumi_metas = extract_mikan_bangumis_meta_from_my_bangumi_page( - &mikan_client, - mikan_client.base_url().join("/Home/MyBangumi")?, - ); - - pin_mut!(bangumi_metas); - - let _bangumi_metas = bangumi_metas.try_collect::>().await?; - } - - Ok(()) - } -} diff --git a/apps/recorder/src/tasks/mikan/create_mikan_bangumi_subscriptions_from_my_bangumi_page.rs b/apps/recorder/src/tasks/mikan/create_mikan_bangumi_subscriptions_from_my_bangumi_page.rs new file mode 100644 index 0000000..84cbaab --- /dev/null +++ b/apps/recorder/src/tasks/mikan/create_mikan_bangumi_subscriptions_from_my_bangumi_page.rs @@ -0,0 +1,38 @@ +// use std::borrow::Cow; + +// use futures::{TryStreamExt, pin_mut}; + +// use crate::{ +// app::AppContextTrait, +// errors::RResult, +// extract::mikan::{ +// MikanAuthSecrecy, +// web_extract::extract_mikan_bangumis_meta_from_my_bangumi_page, }, +// tasks::core::{StreamTaskTrait, TaskVars}, +// }; + +// #[derive(Debug)] +// pub struct CreateMikanRSSFromMyBangumiTask { +// pub subscriber_id: i32, +// pub task_id: String, +// pub auth_secrecy: MikanAuthSecrecy, +// } + +// async fn run(app_context: &dyn AppContextTrait, _vars: &TaskVars) -> +// RResult<()> { let mikan_client = app_context +// .mikan +// .fork_with_auth(todo!().auth_secrecy.clone())?; + +// { +// let bangumi_metas = extract_mikan_bangumis_meta_from_my_bangumi_page( +// &mikan_client, +// mikan_client.base_url().join("/Home/MyBangumi")?, +// ); + +// pin_mut!(bangumi_metas); + +// let _bangumi_metas = bangumi_metas.try_collect::>().await?; +// } + +// Ok(()) +// } diff --git a/apps/recorder/src/tasks/mikan/mod.rs b/apps/recorder/src/tasks/mikan/mod.rs new file mode 100644 index 0000000..401a983 --- /dev/null +++ b/apps/recorder/src/tasks/mikan/mod.rs @@ -0,0 +1 @@ +pub mod create_mikan_bangumi_subscriptions_from_my_bangumi_page; diff --git a/apps/recorder/src/tasks/mod.rs b/apps/recorder/src/tasks/mod.rs index 908032d..00efa78 100644 --- a/apps/recorder/src/tasks/mod.rs +++ b/apps/recorder/src/tasks/mod.rs @@ -1,2 +1,4 @@ pub mod core; -pub mod create_mikan_bangumi_subscriptions_from_my_bangumi_page; +pub mod mikan; +pub mod service; +pub mod registry; diff --git a/apps/recorder/src/tasks/registry.rs b/apps/recorder/src/tasks/registry.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/apps/recorder/src/tasks/registry.rs @@ -0,0 +1 @@ + diff --git a/apps/recorder/src/tasks/service.rs b/apps/recorder/src/tasks/service.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/apps/recorder/src/tasks/service.rs @@ -0,0 +1 @@ + diff --git a/apps/recorder/src/test_utils/app.rs b/apps/recorder/src/test_utils/app.rs new file mode 100644 index 0000000..aa17845 --- /dev/null +++ b/apps/recorder/src/test_utils/app.rs @@ -0,0 +1,62 @@ +use typed_builder::TypedBuilder; + +use crate::app::AppContextTrait; + +#[derive(TypedBuilder)] +#[builder(field_defaults(default, setter(strip_option)))] +pub struct UnitTestAppContext { + logger: Option, + db: Option, + config: Option, + cache: Option, + mikan: Option, + auth: Option, + graphql: Option, + storage: Option, + #[builder(default = Some(String::from(env!("CARGO_MANIFEST_DIR"))))] + working_dir: Option, + #[builder(default = crate::app::Environment::Testing, setter(!strip_option))] + environment: crate::app::Environment, +} + +impl AppContextTrait for UnitTestAppContext { + fn logger(&self) -> &crate::logger::LoggerService { + self.logger.as_ref().expect("should set logger") + } + + fn db(&self) -> &crate::database::DatabaseService { + self.db.as_ref().expect("should set db") + } + + fn config(&self) -> &crate::app::AppConfig { + self.config.as_ref().expect("should set config") + } + + fn cache(&self) -> &crate::cache::CacheService { + self.cache.as_ref().expect("should set cache") + } + + fn mikan(&self) -> &crate::extract::mikan::MikanClient { + self.mikan.as_ref().expect("should set mikan") + } + + fn auth(&self) -> &crate::auth::AuthService { + self.auth.as_ref().expect("should set auth") + } + + fn graphql(&self) -> &crate::graphql::GraphQLService { + self.graphql.as_ref().expect("should set graphql") + } + + fn storage(&self) -> &crate::storage::StorageService { + self.storage.as_ref().expect("should set storage") + } + + fn environment(&self) -> &crate::app::Environment { + &self.environment + } + + fn working_dir(&self) -> &String { + self.working_dir.as_ref().expect("should set working_dir") + } +} diff --git a/apps/recorder/src/test_utils/mod.rs b/apps/recorder/src/test_utils/mod.rs index b0d9db0..940f548 100644 --- a/apps/recorder/src/test_utils/mod.rs +++ b/apps/recorder/src/test_utils/mod.rs @@ -1,3 +1,4 @@ +pub mod app; pub mod fetch; pub mod mikan; #[cfg(feature = "testcontainers")] diff --git a/apps/recorder/src/test_utils/tracing.rs b/apps/recorder/src/test_utils/tracing.rs index 96d5fc6..03cf18e 100644 --- a/apps/recorder/src/test_utils/tracing.rs +++ b/apps/recorder/src/test_utils/tracing.rs @@ -1,10 +1,10 @@ use tracing::Level; use tracing_subscriber::EnvFilter; -pub fn init_testing_tracing(level: Level) { +pub fn try_init_testing_tracing(level: Level) { let crate_name = env!("CARGO_PKG_NAME"); let level = level.as_str().to_lowercase(); let filter = EnvFilter::new(format!("{}[]={}", crate_name, level)) .add_directive(format!("mockito[]={}", level).parse().unwrap()); - tracing_subscriber::fmt().with_env_filter(filter).init(); + let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init(); } diff --git a/apps/recorder/src/web/controller/core.rs b/apps/recorder/src/web/controller/core.rs index e1a9ce3..9fa9634 100644 --- a/apps/recorder/src/web/controller/core.rs +++ b/apps/recorder/src/web/controller/core.rs @@ -2,19 +2,23 @@ use std::{borrow::Cow, sync::Arc}; use axum::Router; -use crate::app::AppContext; +use crate::app::AppContextTrait; pub trait ControllerTrait: Sized { - fn apply_to(self, router: Router>) -> Router>; + fn apply_to(self, router: Router>) + -> Router>; } pub struct PrefixController { prefix: Cow<'static, str>, - router: Router>, + router: Router>, } impl PrefixController { - pub fn new(prefix: impl Into>, router: Router>) -> Self { + pub fn new( + prefix: impl Into>, + router: Router>, + ) -> Self { Self { prefix: prefix.into(), router, @@ -23,7 +27,10 @@ impl PrefixController { } impl ControllerTrait for PrefixController { - fn apply_to(self, router: Router>) -> Router> { + fn apply_to( + self, + router: Router>, + ) -> Router> { router.nest(&self.prefix, self.router) } } @@ -35,14 +42,17 @@ pub enum Controller { impl Controller { pub fn from_prefix( prefix: impl Into>, - router: Router>, + router: Router>, ) -> Self { Self::Prefix(PrefixController::new(prefix, router)) } } impl ControllerTrait for Controller { - fn apply_to(self, router: Router>) -> Router> { + fn apply_to( + self, + router: Router>, + ) -> Router> { match self { Self::Prefix(p) => p.apply_to(router), } diff --git a/apps/recorder/src/web/controller/graphql/mod.rs b/apps/recorder/src/web/controller/graphql/mod.rs index 03d37d1..8fdd5be 100644 --- a/apps/recorder/src/web/controller/graphql/mod.rs +++ b/apps/recorder/src/web/controller/graphql/mod.rs @@ -5,7 +5,7 @@ use axum::{Extension, Router, extract::State, middleware::from_fn_with_state, ro use super::core::Controller; use crate::{ - app::AppContext, + app::AppContextTrait, auth::{AuthUserInfo, header_www_authenticate_middleware}, errors::RResult, }; @@ -13,11 +13,11 @@ use crate::{ pub const CONTROLLER_PREFIX: &str = "/api/graphql"; async fn graphql_handler( - State(ctx): State>, + State(ctx): State>, Extension(auth_user_info): Extension, req: GraphQLRequest, ) -> GraphQLResponse { - let graphql_service = &ctx.graphql; + let graphql_service = ctx.graphql(); let mut req = req.into_inner(); req = req.data(auth_user_info); @@ -25,8 +25,8 @@ async fn graphql_handler( graphql_service.schema.execute(req).await.into() } -pub async fn create(ctx: Arc) -> RResult { - let router = Router::>::new() +pub async fn create(ctx: Arc) -> RResult { + let router = Router::>::new() .route("/", post(graphql_handler)) .layer(from_fn_with_state(ctx, header_www_authenticate_middleware)); Ok(Controller::from_prefix(CONTROLLER_PREFIX, router)) diff --git a/apps/recorder/src/web/controller/oidc/mod.rs b/apps/recorder/src/web/controller/oidc/mod.rs index 75f306d..dc79113 100644 --- a/apps/recorder/src/web/controller/oidc/mod.rs +++ b/apps/recorder/src/web/controller/oidc/mod.rs @@ -9,7 +9,7 @@ use axum::{ use super::core::Controller; use crate::{ - app::AppContext, + app::AppContextTrait, auth::{ AuthError, AuthService, AuthServiceTrait, oidc::{OidcAuthCallbackPayload, OidcAuthCallbackQuery, OidcAuthRequest}, @@ -22,10 +22,10 @@ use crate::{ pub const CONTROLLER_PREFIX: &str = "/api/oidc"; async fn oidc_callback( - State(ctx): State>, + State(ctx): State>, Query(query): Query, ) -> Result, AuthError> { - let auth_service = &ctx.auth; + let auth_service = ctx.auth(); if let AuthService::Oidc(oidc_auth_service) = auth_service { let response = oidc_auth_service .extract_authorization_request_callback(query) @@ -40,10 +40,10 @@ async fn oidc_callback( } async fn oidc_auth( - State(ctx): State>, + State(ctx): State>, parts: Parts, ) -> Result, AuthError> { - let auth_service = &ctx.auth; + let auth_service = ctx.auth(); if let AuthService::Oidc(oidc_auth_service) = auth_service { let mut redirect_uri = ForwardedRelatedInfo::from_request_parts(&parts) .resolved_origin() @@ -70,8 +70,8 @@ async fn oidc_auth( } } -pub async fn create(_context: Arc) -> RResult { - let router = Router::>::new() +pub async fn create(_context: Arc) -> RResult { + let router = Router::>::new() .route("/auth", get(oidc_auth)) .route("/callback", get(oidc_callback)); diff --git a/apps/recorder/src/web/middleware/catch_panic.rs b/apps/recorder/src/web/middleware/catch_panic.rs index 12e1e5a..21108f7 100644 --- a/apps/recorder/src/web/middleware/catch_panic.rs +++ b/apps/recorder/src/web/middleware/catch_panic.rs @@ -12,7 +12,7 @@ use http::StatusCode; use serde::{Deserialize, Serialize}; use tower_http::catch_panic::CatchPanicLayer; -use crate::{app::AppContext, errors::RResult, web::middleware::MiddlewareLayer}; +use crate::{app::AppContextTrait, errors::RResult, web::middleware::MiddlewareLayer}; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct CatchPanic { @@ -52,7 +52,10 @@ impl MiddlewareLayer for CatchPanic { } /// Applies the Catch Panic middleware layer to the Axum router. - fn apply(&self, app: Router>) -> RResult>> { + fn apply( + &self, + app: Router>, + ) -> RResult>> { Ok(app.layer(CatchPanicLayer::custom(handle_panic))) } } diff --git a/apps/recorder/src/web/middleware/compression.rs b/apps/recorder/src/web/middleware/compression.rs index 09e2531..a9d539f 100644 --- a/apps/recorder/src/web/middleware/compression.rs +++ b/apps/recorder/src/web/middleware/compression.rs @@ -11,7 +11,7 @@ use axum::Router; use serde::{Deserialize, Serialize}; use tower_http::compression::CompressionLayer; -use crate::{app::AppContext, errors::RResult, web::middleware::MiddlewareLayer}; +use crate::{app::AppContextTrait, errors::RResult, web::middleware::MiddlewareLayer}; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Compression { @@ -35,7 +35,10 @@ impl MiddlewareLayer for Compression { } /// Applies the Compression middleware layer to the Axum router. - fn apply(&self, app: Router>) -> RResult>> { + fn apply( + &self, + app: Router>, + ) -> RResult>> { Ok(app.layer(CompressionLayer::new())) } } diff --git a/apps/recorder/src/web/middleware/cors.rs b/apps/recorder/src/web/middleware/cors.rs index 43d81eb..565b0f9 100644 --- a/apps/recorder/src/web/middleware/cors.rs +++ b/apps/recorder/src/web/middleware/cors.rs @@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use tower_http::cors::{self, Any}; -use crate::{app::AppContext, web::middleware::MiddlewareLayer, errors::RResult}; +use crate::{app::AppContextTrait, web::middleware::MiddlewareLayer, errors::RResult}; /// CORS middleware configuration #[derive(Debug, Clone, Deserialize, Serialize)] @@ -157,7 +157,7 @@ impl MiddlewareLayer for Cors { } /// Applies the CORS middleware layer to the Axum router. - fn apply(&self, app: Router>) -> RResult>> { + fn apply(&self, app: Router>) -> RResult>> { Ok(app.layer(self.cors()?)) } } diff --git a/apps/recorder/src/web/middleware/etag.rs b/apps/recorder/src/web/middleware/etag.rs index 545a41d..f61ace3 100644 --- a/apps/recorder/src/web/middleware/etag.rs +++ b/apps/recorder/src/web/middleware/etag.rs @@ -25,7 +25,7 @@ use futures_util::future::BoxFuture; use serde::{Deserialize, Serialize}; use tower::{Layer, Service}; -use crate::{app::AppContext, errors::RResult, web::middleware::MiddlewareLayer}; +use crate::{app::AppContextTrait, errors::RResult, web::middleware::MiddlewareLayer}; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Etag { @@ -49,7 +49,10 @@ impl MiddlewareLayer for Etag { } /// Applies the `ETag` middleware to the application router. - fn apply(&self, app: Router>) -> RResult>> { + fn apply( + &self, + app: Router>, + ) -> RResult>> { Ok(app.layer(EtagLayer)) } } diff --git a/apps/recorder/src/web/middleware/logger.rs b/apps/recorder/src/web/middleware/logger.rs index 331fa7c..d900e20 100644 --- a/apps/recorder/src/web/middleware/logger.rs +++ b/apps/recorder/src/web/middleware/logger.rs @@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize}; use tower_http::{add_extension::AddExtensionLayer, trace::TraceLayer}; use crate::{ - app::{AppContext, Environment}, + app::{AppContextTrait, Environment}, errors::RResult, web::middleware::{MiddlewareLayer, request_id::LocoRequestId}, }; @@ -35,10 +35,10 @@ pub struct Middleware { /// Creates a new instance of [`Middleware`] by cloning the [`Config`] /// configuration. #[must_use] -pub fn new(config: &Config, context: Arc) -> Middleware { +pub fn new(config: &Config, context: Arc) -> Middleware { Middleware { config: config.clone(), - environment: context.environment.clone(), + environment: context.environment().clone(), } } @@ -67,7 +67,10 @@ impl MiddlewareLayer for Middleware { /// The `TraceLayer` is customized with `make_span_with` to extract /// request-specific details like method, URI, version, user agent, and /// request ID, then create a tracing span for the request. - fn apply(&self, app: Router>) -> RResult>> { + fn apply( + &self, + app: Router>, + ) -> RResult>> { Ok(app .layer( TraceLayer::new_for_http().make_span_with(|request: &http::Request<_>| { diff --git a/apps/recorder/src/web/middleware/mod.rs b/apps/recorder/src/web/middleware/mod.rs index 02a5a43..165aad9 100644 --- a/apps/recorder/src/web/middleware/mod.rs +++ b/apps/recorder/src/web/middleware/mod.rs @@ -14,7 +14,7 @@ use std::sync::Arc; use axum::Router; use serde::{Deserialize, Serialize}; -use crate::{app::AppContext, errors::RResult}; +use crate::{app::AppContextTrait, errors::RResult}; /// Trait representing the behavior of middleware components in the application. /// When implementing a new middleware, make sure to go over this checklist: @@ -52,14 +52,17 @@ pub trait MiddlewareLayer { /// # Errors /// /// If there is an issue when adding the middleware to the router. - fn apply(&self, app: Router>) -> RResult>>; + fn apply( + &self, + app: Router>, + ) -> RResult>>; } #[allow(clippy::unnecessary_lazy_evaluations)] #[must_use] -pub fn default_middleware_stack(ctx: Arc) -> Vec> { +pub fn default_middleware_stack(ctx: Arc) -> Vec> { // Shortened reference to middlewares - let middlewares = &ctx.config.server.middlewares; + let middlewares = &ctx.config().server.middlewares; vec![ // CORS middleware with a default if none diff --git a/apps/recorder/src/web/middleware/remote_ip.rs b/apps/recorder/src/web/middleware/remote_ip.rs index 78f1ca4..8f3527d 100644 --- a/apps/recorder/src/web/middleware/remote_ip.rs +++ b/apps/recorder/src/web/middleware/remote_ip.rs @@ -31,7 +31,7 @@ use tower::{Layer, Service}; use tracing::error; use crate::{ - app::AppContext, + app::AppContextTrait, errors::{RError, RResult}, web::middleware::MiddlewareLayer, }; @@ -123,7 +123,10 @@ impl MiddlewareLayer for RemoteIpMiddleware { } /// Applies the Remote IP middleware to the given Axum router. - fn apply(&self, app: Router>) -> RResult>> { + fn apply( + &self, + app: Router>, + ) -> RResult>> { Ok(app.layer(RemoteIPLayer::new(self)?)) } } diff --git a/apps/recorder/src/web/middleware/request_id.rs b/apps/recorder/src/web/middleware/request_id.rs index 595ee68..240af4e 100644 --- a/apps/recorder/src/web/middleware/request_id.rs +++ b/apps/recorder/src/web/middleware/request_id.rs @@ -11,7 +11,7 @@ use regex::Regex; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use crate::{web::middleware::MiddlewareLayer, app::AppContext, errors::RResult}; +use crate::{app::AppContextTrait, errors::RResult, web::middleware::MiddlewareLayer}; const X_REQUEST_ID: &str = "x-request-id"; const MAX_LEN: usize = 255; @@ -52,7 +52,10 @@ impl MiddlewareLayer for RequestId { /// /// # Errors /// This function returns an error if the middleware cannot be applied. - fn apply(&self, app: Router>) -> RResult>> { + fn apply( + &self, + app: Router>, + ) -> RResult>> { Ok(app.layer(axum::middleware::from_fn(request_id_middleware))) } } diff --git a/apps/recorder/src/web/middleware/secure_headers.rs b/apps/recorder/src/web/middleware/secure_headers.rs index 138dc27..31b6b8e 100644 --- a/apps/recorder/src/web/middleware/secure_headers.rs +++ b/apps/recorder/src/web/middleware/secure_headers.rs @@ -21,7 +21,7 @@ use serde_json::{self, json}; use tower::{Layer, Service}; use crate::{ - app::AppContext, + app::AppContextTrait, web::middleware::MiddlewareLayer, errors::{RError, RResult}, }; @@ -115,7 +115,7 @@ impl MiddlewareLayer for SecureHeader { } /// Applies the secure headers layer to the application router - fn apply(&self, app: Router>) -> RResult>> { + fn apply(&self, app: Router>) -> RResult>> { Ok(app.layer(SecureHeaders::new(self)?)) } } diff --git a/apps/recorder/src/web/middleware/timeout.rs b/apps/recorder/src/web/middleware/timeout.rs index 1d4f9b0..69b9f36 100644 --- a/apps/recorder/src/web/middleware/timeout.rs +++ b/apps/recorder/src/web/middleware/timeout.rs @@ -16,7 +16,7 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use tower_http::timeout::TimeoutLayer; -use crate::{app::AppContext, errors::RResult, web::middleware::MiddlewareLayer}; +use crate::{app::AppContextTrait, errors::RResult, web::middleware::MiddlewareLayer}; /// Timeout middleware configuration #[derive(Debug, Clone, Deserialize, Serialize)] @@ -58,7 +58,10 @@ impl MiddlewareLayer for TimeOut { /// This method wraps the provided [`AXRouter`] in a [`TimeoutLayer`], /// ensuring that requests exceeding the specified timeout duration will /// be interrupted. - fn apply(&self, app: Router>) -> RResult>> { + fn apply( + &self, + app: Router>, + ) -> RResult>> { Ok(app.layer(TimeoutLayer::new(Duration::from_millis(self.timeout)))) } }