feat: add replay-stream-tasks pattern support

This commit is contained in:
master 2025-03-08 16:43:00 +08:00
parent e66573b315
commit f94e175082
47 changed files with 989 additions and 318 deletions

62
Cargo.lock generated
View File

@ -164,6 +164,12 @@ version = "1.0.96"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b964d184e89d9b6b67dd2715bc8e74cf3107fb2b529990c90cf517326150bf4"
[[package]]
name = "append-only-vec"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7992085ec035cfe96992dd31bfd495a2ebd31969bb95f624471cb6c0b349e571"
[[package]]
name = "arrayvec"
version = "0.7.6"
@ -1242,6 +1248,22 @@ dependencies = [
"syn 2.0.98",
]
[[package]]
name = "ctor"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7747ac3a66a06f4ee6718686c8ea976d2d05fb30ada93ebd76b3f9aef97257c"
dependencies = [
"ctor-proc-macro",
"dtor",
]
[[package]]
name = "ctor-proc-macro"
version = "0.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f211af61d8efdd104f96e57adf5e426ba1bc3ed7a4ead616e15e5881fd79c4d"
[[package]]
name = "curve25519-dalek"
version = "4.1.3"
@ -1501,6 +1523,21 @@ dependencies = [
"dtoa",
]
[[package]]
name = "dtor"
version = "0.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bf39a0bfd1f94d62ffdb2802a7e6244c0f34f6ebacf5d4c26547d08cd1d67a5"
dependencies = [
"dtor-proc-macro",
]
[[package]]
name = "dtor-proc-macro"
version = "0.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7454e41ff9012c00d53cf7f475c5e3afa3b91b7c90568495495e8d9bf47a1055"
[[package]]
name = "dyn-clone"
version = "1.0.18"
@ -3008,6 +3045,16 @@ dependencies = [
"mutate_once",
]
[[package]]
name = "kanal"
version = "0.1.0-pre8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b05d55519627edaf7fd0f29981f6dc03fb52df3f5b257130eb8d0bf2801ea1d7"
dependencies = [
"futures-core",
"lock_api",
]
[[package]]
name = "lazy_static"
version = "1.5.0"
@ -4491,6 +4538,7 @@ name = "recorder"
version = "0.1.0"
dependencies = [
"anyhow",
"append-only-vec",
"async-graphql",
"async-graphql-axum",
"async-stream",
@ -4504,6 +4552,7 @@ dependencies = [
"clap",
"color-eyre",
"cookie",
"ctor",
"dotenv",
"fancy-regex",
"fastrand",
@ -4519,6 +4568,7 @@ dependencies = [
"ipnetwork",
"itertools 0.14.0",
"jwt-authorizer",
"kanal",
"lazy_static",
"leaky-bucket",
"librqbit-core",
@ -4544,7 +4594,6 @@ dependencies = [
"sea-orm",
"sea-orm-migration",
"seaography",
"secrecy",
"serde",
"serde_json",
"serde_variant",
@ -4561,6 +4610,7 @@ dependencies = [
"tracing",
"tracing-appender",
"tracing-subscriber",
"typed-builder",
"url",
"uuid",
"zune-image",
@ -5291,16 +5341,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "secrecy"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e891af845473308773346dc847b2c23ee78fe442e0472ac50e22a18a93d3ae5a"
dependencies = [
"serde",
"zeroize",
]
[[package]]
name = "security-framework"
version = "2.11.1"

View File

@ -121,7 +121,6 @@ http-cache = { version = "0.20.0", features = [
http-cache-semantics = "2.1.0"
dotenv = "0.15.0"
nom = "8.0.0"
secrecy = { version = "0.10.3", features = ["serde"] }
http = "1.2.0"
cookie = "0.18.1"
async-stream = "0.3.6"
@ -130,6 +129,10 @@ tracing-appender = "0.2.3"
clap = "4.5.31"
futures-util = "0.3.31"
ipnetwork = "0.21.1"
kanal = "0.1.0-pre8"
append-only-vec = "0.1.7"
typed-builder = "0.20.0"
ctor = "0.4.0"
[dev-dependencies]
serial_test = "3"

View File

@ -19,7 +19,7 @@
// use sea_orm::ColumnTrait;
// use sea_orm_migration::MigratorTrait;
// async fn pull_mikan_bangumi_rss(ctx: &AppContext) -> RResult<()> {
// async fn pull_mikan_bangumi_rss(ctx: &dyn AppContextTrait) -> RResult<()> {
// let rss_link = "https://mikanani.me/RSS/Bangumi?bangumiId=3416&subgroupid=370";
// // let rss_link =
@ -27,7 +27,7 @@
// let subscription = if let Some(subscription) =
// subscriptions::Entity::find()
// .filter(subscriptions::Column::SourceUrl.eq(String::from(rss_link)))
// .one(&ctx.db)
// .one(ctx.db())
// .await?
// {
// subscription
@ -55,7 +55,7 @@
// let BootResult {
// app_context: ctx, ..
// } = loco_rs::boot::run_app::<App1>(&StartMode::ServerOnly, ctx).await?;
// Migrator::up(&ctx.db, None).await?;
// Migrator::up(ctx.db(), None).await?;
// Ok(ctx)
// }

View File

@ -5,17 +5,30 @@ use crate::{
storage::StorageService,
};
pub trait AppContextTrait: Send + Sync {
fn logger(&self) -> &LoggerService;
fn db(&self) -> &DatabaseService;
fn config(&self) -> &AppConfig;
fn cache(&self) -> &CacheService;
fn mikan(&self) -> &MikanClient;
fn auth(&self) -> &AuthService;
fn graphql(&self) -> &GraphQLService;
fn storage(&self) -> &StorageService;
fn working_dir(&self) -> &String;
fn environment(&self) -> &Environment;
}
pub struct AppContext {
pub logger: LoggerService,
pub db: DatabaseService,
pub config: AppConfig,
pub cache: CacheService,
pub mikan: MikanClient,
pub auth: AuthService,
pub graphql: GraphQLService,
pub storage: StorageService,
pub working_dir: String,
pub environment: Environment,
logger: LoggerService,
db: DatabaseService,
config: AppConfig,
cache: CacheService,
mikan: MikanClient,
auth: AuthService,
graphql: GraphQLService,
storage: StorageService,
working_dir: String,
environment: Environment,
}
impl AppContext {
@ -48,3 +61,35 @@ impl AppContext {
})
}
}
impl AppContextTrait for AppContext {
fn logger(&self) -> &LoggerService {
&self.logger
}
fn db(&self) -> &DatabaseService {
&self.db
}
fn config(&self) -> &AppConfig {
&self.config
}
fn cache(&self) -> &CacheService {
&self.cache
}
fn mikan(&self) -> &MikanClient {
&self.mikan
}
fn auth(&self) -> &AuthService {
&self.auth
}
fn graphql(&self) -> &GraphQLService {
&self.graphql
}
fn storage(&self) -> &StorageService {
&self.storage
}
fn working_dir(&self) -> &String {
&self.working_dir
}
fn environment(&self) -> &Environment {
&self.environment
}
}

View File

@ -4,7 +4,7 @@ use axum::Router;
use futures::try_join;
use tokio::signal;
use super::{builder::AppBuilder, context::AppContext};
use super::{builder::AppBuilder, context::AppContextTrait};
use crate::{
errors::RResult,
web::{
@ -14,7 +14,7 @@ use crate::{
};
pub struct App {
pub context: Arc<AppContext>,
pub context: Arc<dyn AppContextTrait>,
pub builder: AppBuilder,
}
@ -25,14 +25,14 @@ impl App {
pub async fn serve(&self) -> RResult<()> {
let context = &self.context;
let config = &context.config;
let config = context.config();
let listener = tokio::net::TcpListener::bind(&format!(
"{}:{}",
config.server.binding, config.server.port
))
.await?;
let mut router = Router::<Arc<AppContext>>::new();
let mut router = Router::<Arc<dyn AppContextTrait>>::new();
let (graphqlc, oidcc) = try_join!(
controller::graphql::create(context.clone()),

View File

@ -8,5 +8,5 @@ pub use core::App;
pub use builder::AppBuilder;
pub use config::AppConfig;
pub use context::AppContext;
pub use context::{AppContext, AppContextTrait};
pub use env::Environment;

View File

@ -9,7 +9,7 @@ use super::{
service::{AuthServiceTrait, AuthUserInfo},
};
use crate::{
app::AppContext,
app::AppContextTrait,
models::{auth::AuthType, subscribers::SEED_SUBSCRIBER},
};
@ -64,7 +64,7 @@ pub struct BasicAuthService {
impl AuthServiceTrait for BasicAuthService {
async fn extract_user_info(
&self,
ctx: &AppContext,
ctx: &dyn AppContextTrait,
request: &mut Parts,
) -> Result<AuthUserInfo, AuthError> {
if let Ok(AuthBasic {

View File

@ -7,18 +7,21 @@ use axum::{
response::{IntoResponse, Response},
};
use crate::{app::AppContext, auth::AuthServiceTrait};
use crate::{app::AppContextTrait, auth::AuthServiceTrait};
pub async fn header_www_authenticate_middleware(
State(ctx): State<Arc<AppContext>>,
State(ctx): State<Arc<dyn AppContextTrait>>,
request: Request,
next: Next,
) -> Response {
let auth_service = &ctx.auth;
let auth_service = ctx.auth();
let (mut parts, body) = request.into_parts();
let mut response = match auth_service.extract_user_info(&ctx, &mut parts).await {
let mut response = match auth_service
.extract_user_info(ctx.as_ref() as &dyn AppContextTrait, &mut parts)
.await
{
Ok(auth_user_info) => {
let mut request = Request::from_parts(parts, body);
request.extensions_mut().insert(auth_user_info);

View File

@ -23,7 +23,7 @@ use super::{
errors::AuthError,
service::{AuthServiceTrait, AuthUserInfo},
};
use crate::{app::AppContext, errors::RError, fetch::HttpClient, models::auth::AuthType};
use crate::{app::AppContextTrait, errors::RError, fetch::HttpClient, models::auth::AuthType};
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct OidcAuthClaims {
@ -261,7 +261,7 @@ impl OidcAuthService {
impl AuthServiceTrait for OidcAuthService {
async fn extract_user_info(
&self,
ctx: &AppContext,
ctx: &dyn AppContextTrait,
request: &mut Parts,
) -> Result<AuthUserInfo, AuthError> {
let config = &self.config;

View File

@ -1,4 +1,4 @@
use std::time::Duration;
use std::{sync::Arc, time::Duration};
use async_trait::async_trait;
use axum::{
@ -17,7 +17,7 @@ use super::{
oidc::{OidcAuthClaims, OidcAuthService},
};
use crate::{
app::AppContext,
app::AppContextTrait,
fetch::{
HttpClient, HttpClientConfig,
client::{HttpClientCacheBackendConfig, HttpClientCachePresetConfig},
@ -31,17 +31,17 @@ pub struct AuthUserInfo {
pub auth_type: AuthType,
}
impl FromRequestParts<AppContext> for AuthUserInfo {
impl FromRequestParts<Arc<dyn AppContextTrait>> for AuthUserInfo {
type Rejection = Response;
async fn from_request_parts(
parts: &mut Parts,
state: &AppContext,
state: &Arc<dyn AppContextTrait>,
) -> Result<Self, Self::Rejection> {
let auth_service = &state.auth;
let auth_service = state.auth();
auth_service
.extract_user_info(state, parts)
.extract_user_info(state.as_ref(), parts)
.await
.map_err(|err| err.into_response())
}
@ -51,7 +51,7 @@ impl FromRequestParts<AppContext> for AuthUserInfo {
pub trait AuthServiceTrait {
async fn extract_user_info(
&self,
ctx: &AppContext,
ctx: &dyn AppContextTrait,
request: &mut Parts,
) -> Result<AuthUserInfo, AuthError>;
fn www_authenticate_header_value(&self) -> Option<HeaderValue>;
@ -104,7 +104,7 @@ impl AuthService {
impl AuthServiceTrait for AuthService {
async fn extract_user_info(
&self,
ctx: &AppContext,
ctx: &dyn AppContextTrait,
request: &mut Parts,
) -> Result<AuthUserInfo, AuthError> {
match self {

View File

@ -2,6 +2,7 @@ use std::{borrow::Cow, error::Error as StdError};
use axum::response::{IntoResponse, Response};
use http::StatusCode;
use serde::{Deserialize, Deserializer, Serialize};
use thiserror::Error as ThisError;
use crate::{auth::AuthError, fetch::HttpClientError};
@ -113,4 +114,23 @@ impl IntoResponse for RError {
}
}
impl Serialize for RError {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&self.to_string())
}
}
impl<'de> Deserialize<'de> for RError {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
Ok(Self::CustomMessageString(s))
}
}
pub type RResult<T> = Result<T, RError>;

View File

@ -1,7 +1,7 @@
use std::ops::Deref;
use std::{fmt::Debug, ops::Deref};
use reqwest_middleware::ClientWithMiddleware;
use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
use url::Url;
use super::MikanConfig;
@ -10,15 +10,24 @@ use crate::{
fetch::{HttpClient, HttpClientTrait, client::HttpClientCookiesAuth},
};
#[derive(Debug, Default, Clone)]
#[derive(Default, Clone, Deserialize, Serialize)]
pub struct MikanAuthSecrecy {
pub cookie: SecretString,
pub cookie: String,
pub user_agent: Option<String>,
}
impl Debug for MikanAuthSecrecy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MikanAuthSecrecy")
.field("cookie", &String::from("[secrecy]"))
.field("user_agent", &String::from("[secrecy]"))
.finish()
}
}
impl MikanAuthSecrecy {
pub fn into_cookie_auth(self, url: &Url) -> Result<HttpClientCookiesAuth, RError> {
HttpClientCookiesAuth::from_cookies(self.cookie.expose_secret(), url, self.user_agent)
HttpClientCookiesAuth::from_cookies(&self.cookie, url, self.user_agent)
}
}
@ -38,9 +47,13 @@ impl MikanClient {
})
}
pub fn fork_with_auth(&self, secrecy: MikanAuthSecrecy) -> Result<Self, RError> {
let cookie_auth = secrecy.into_cookie_auth(&self.base_url)?;
let fork = self.http_client.fork().attach_secrecy(cookie_auth);
pub fn fork_with_auth(&self, secrecy: Option<MikanAuthSecrecy>) -> Result<Self, RError> {
let mut fork = self.http_client.fork();
if let Some(secrecy) = secrecy {
let cookie_auth = secrecy.into_cookie_auth(&self.base_url)?;
fork = fork.attach_secrecy(cookie_auth);
}
Ok(Self {
http_client: HttpClient::from_fork(fork)?,

View File

@ -1,18 +1,20 @@
use std::borrow::Cow;
use std::{borrow::Cow, sync::Arc};
use async_stream::try_stream;
use bytes::Bytes;
use futures::Stream;
use itertools::Itertools;
use scraper::{Html, Selector};
use serde::{Deserialize, Serialize};
use tracing::instrument;
use url::Url;
use super::{
MIKAN_BUCKET_KEY, MikanBangumiRssLink, MikanClient, extract_mikan_bangumi_id_from_rss_link,
MIKAN_BUCKET_KEY, MikanAuthSecrecy, MikanBangumiRssLink, MikanClient,
extract_mikan_bangumi_id_from_rss_link,
};
use crate::{
app::AppContext,
app::AppContextTrait,
errors::{RError, RResult},
extract::{
html::{extract_background_image_src_from_style_attr, extract_inner_text_from_element_ref},
@ -20,6 +22,7 @@ use crate::{
},
fetch::{html::fetch_html, image::fetch_image},
storage::StorageContentCategory,
tasks::core::{StandardStreamTaskReplayLayout, StreamTaskRunnerTrait},
};
#[derive(Clone, Debug, PartialEq)]
@ -34,7 +37,7 @@ pub struct MikanEpisodeMeta {
pub mikan_episode_id: String,
}
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct MikanBangumiMeta {
pub homepage: Url,
pub origin_poster_src: Option<Url>,
@ -123,12 +126,12 @@ pub async fn extract_mikan_poster_meta_from_src(
}
pub async fn extract_mikan_bangumi_poster_meta_from_src_with_cache(
ctx: &AppContext,
ctx: &dyn AppContextTrait,
origin_poster_src_url: Url,
subscriber_id: i32,
) -> RResult<MikanBangumiPosterMeta> {
let dal_client = &ctx.storage;
let mikan_client = &ctx.mikan;
let dal_client = ctx.storage();
let mikan_client = ctx.mikan();
if let Some(poster_src) = dal_client
.exists_object(
StorageContentCategory::Image,
@ -346,126 +349,141 @@ pub async fn extract_mikan_bangumi_meta_from_bangumi_homepage(
})
}
/**
* @logined-required
*/
#[instrument(skip_all, fields(my_bangumi_page_url = my_bangumi_page_url.as_str()))]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExtractMikanBangumisMetaFromMyBangumiRequest {
pub my_bangumi_page_url: Url,
pub auth_securcy: Option<MikanAuthSecrecy>,
}
pub type ExtractMikanBangumisMetaFromMyBangumiTask =
StandardStreamTaskReplayLayout<ExtractMikanBangumisMetaFromMyBangumiRequest, MikanBangumiMeta>;
#[instrument(skip_all, fields(my_bangumi_page_url, auth_securcy = ?auth_securcy, history = history.len()))]
pub fn extract_mikan_bangumis_meta_from_my_bangumi_page(
http_client: &MikanClient,
context: Arc<dyn AppContextTrait>,
my_bangumi_page_url: Url,
) -> impl Stream<Item = Result<MikanBangumiMeta, RError>> {
auth_securcy: Option<MikanAuthSecrecy>,
history: &[Arc<RResult<MikanBangumiMeta>>],
) -> impl Stream<Item = RResult<MikanBangumiMeta>> {
try_stream! {
let http_client = &context.mikan().fork_with_auth(auth_securcy.clone())?;
let mikan_base_url = Url::parse(&my_bangumi_page_url.origin().unicode_serialization())?;
let content = fetch_html(http_client, my_bangumi_page_url.clone()).await?;
let bangumi_container_selector = &Selector::parse(".sk-bangumi .an-ul>li").unwrap();
let bangumi_info_selector = &Selector::parse(".an-info a.an-text").unwrap();
let bangumi_poster_selector =
&Selector::parse("span[data-src][data-bangumiid], span[data-bangumiid][style]")
.unwrap();
let fansub_container_selector =
&Selector::parse(".js-expand_bangumi-subgroup.js-subscribed").unwrap();
&Selector::parse(".js-expand_bangumi-subgroup.js-subscribed").unwrap();
let fansub_title_selector = &Selector::parse(".tag-res-name[title]").unwrap();
let fansub_id_selector =
&Selector::parse(".active[data-subtitlegroupid][data-bangumiid]").unwrap();
let bangumi_iters = {
let bangumi_items = {
let html = Html::parse_document(&content);
let bangumi_container_selector = &Selector::parse(".sk-bangumi .an-ul>li").unwrap();
let bangumi_info_selector = &Selector::parse(".an-info a.an-text").unwrap();
let bangumi_poster_selector =
&Selector::parse("span[data-src][data-bangumiid], span[data-bangumiid][style]")
.unwrap();
html.select(bangumi_container_selector)
.filter_map(|bangumi_elem| {
let title_and_href_elem = bangumi_elem.select(bangumi_info_selector).next();
let poster_elem = bangumi_elem.select(bangumi_poster_selector).next();
if let (Some(bangumi_home_page_url), Some(bangumi_title)) = (
title_and_href_elem.and_then(|elem| elem.attr("href")),
title_and_href_elem.and_then(|elem| elem.attr("title")),
) {
let origin_poster_src = poster_elem.and_then(|ele| {
ele.attr("data-src")
.and_then(|data_src| {
extract_image_src_from_str(data_src, &mikan_base_url)
.filter_map(|bangumi_elem| {
let title_and_href_elem =
bangumi_elem.select(bangumi_info_selector).next();
let poster_elem = bangumi_elem.select(bangumi_poster_selector).next();
if let (Some(bangumi_home_page_url), Some(bangumi_title)) = (
title_and_href_elem.and_then(|elem| elem.attr("href")),
title_and_href_elem.and_then(|elem| elem.attr("title")),
) {
let origin_poster_src = poster_elem.and_then(|ele| {
ele.attr("data-src")
.and_then(|data_src| {
extract_image_src_from_str(data_src, &mikan_base_url)
})
.or_else(|| {
ele.attr("style").and_then(|style| {
extract_background_image_src_from_style_attr(
style,
&mikan_base_url,
)
})
.or_else(|| {
ele.attr("style").and_then(|style| {
extract_background_image_src_from_style_attr(
style,
&mikan_base_url,
)
})
})
});
let bangumi_title = bangumi_title.to_string();
let bangumi_home_page_url =
my_bangumi_page_url.join(bangumi_home_page_url).ok()?;
let MikanBangumiHomepage {
mikan_bangumi_id, ..
} = extract_mikan_bangumi_id_from_homepage(&bangumi_home_page_url)?;
if let Some(origin_poster_src) = origin_poster_src.as_ref() {
tracing::trace!(
origin_poster_src = origin_poster_src.as_str(),
bangumi_title,
mikan_bangumi_id,
"bangumi info extracted"
);
} else {
tracing::warn!(
bangumi_title,
mikan_bangumi_id,
"bangumi info extracted, but failed to extract poster_src"
);
}
let bangumi_expand_info_url = build_mikan_bangumi_expand_info_url(
mikan_base_url.clone(),
&mikan_bangumi_id,
);
Some((
})
});
let bangumi_title = bangumi_title.to_string();
let bangumi_home_page_url =
my_bangumi_page_url.join(bangumi_home_page_url).ok()?;
let MikanBangumiHomepage {
mikan_bangumi_id, ..
} = extract_mikan_bangumi_id_from_homepage(&bangumi_home_page_url)?;
if let Some(origin_poster_src) = origin_poster_src.as_ref() {
tracing::trace!(
origin_poster_src = origin_poster_src.as_str(),
bangumi_title,
mikan_bangumi_id,
bangumi_expand_info_url,
origin_poster_src,
))
"bangumi info extracted"
);
} else {
None
tracing::warn!(
bangumi_title,
mikan_bangumi_id,
"bangumi info extracted, but failed to extract poster_src"
);
}
})
.collect_vec()
};
for (bangumi_title, mikan_bangumi_id, bangumi_expand_info_url, origin_poster_src) in
bangumi_iters
{
if let Some((fansub_name, mikan_fansub_id)) = {
let bangumi_expand_info_content = fetch_html(http_client, bangumi_expand_info_url).await?;
let bangumi_expand_info_fragment = Html::parse_fragment(&bangumi_expand_info_content);
bangumi_expand_info_fragment.select(fansub_container_selector).next().and_then(|fansub_info| {
if let (Some(fansub_name), Some(mikan_fansub_id)) = (
fansub_info
.select(fansub_title_selector)
.next()
.and_then(|ele| ele.attr("title"))
.map(String::from),
fansub_info
.select(fansub_id_selector)
.next()
.and_then(|ele| ele.attr("data-subtitlegroupid"))
.map(String::from)
) {
Some((fansub_name, mikan_fansub_id))
} else {
None
}
})
} {
tracing::trace!(
fansub_name,
mikan_fansub_id,
"subscribed fansub extracted"
);
yield MikanBangumiMeta {
homepage: build_mikan_bangumi_homepage(
let bangumi_expand_info_url = build_mikan_bangumi_expand_info_url(
mikan_base_url.clone(),
&mikan_bangumi_id,
);
Some((
bangumi_title,
mikan_bangumi_id,
bangumi_expand_info_url,
origin_poster_src,
))
} else {
None
}
})
.collect_vec()
};
for (idx, (bangumi_title, mikan_bangumi_id, bangumi_expand_info_url, origin_poster_src)) in
bangumi_items.iter().enumerate()
{
if history.get(idx).is_some() {
continue;
} else if let Some((fansub_name, mikan_fansub_id)) = {
let bangumi_expand_info_content =
fetch_html(http_client, bangumi_expand_info_url.clone()).await?;
let bangumi_expand_info_fragment =
Html::parse_fragment(&bangumi_expand_info_content);
bangumi_expand_info_fragment
.select(fansub_container_selector)
.next()
.and_then(|fansub_info| {
if let (Some(fansub_name), Some(mikan_fansub_id)) = (
fansub_info
.select(fansub_title_selector)
.next()
.and_then(|ele| ele.attr("title"))
.map(String::from),
fansub_info
.select(fansub_id_selector)
.next()
.and_then(|ele| ele.attr("data-subtitlegroupid"))
.map(String::from),
) {
Some((fansub_name, mikan_fansub_id))
} else {
None
}
})
} {
tracing::trace!(fansub_name, mikan_fansub_id, "subscribed fansub extracted");
let item = MikanBangumiMeta {
homepage: build_mikan_bangumi_homepage(
mikan_base_url.clone(),
mikan_bangumi_id,
Some(&mikan_fansub_id),
),
bangumi_title: bangumi_title.to_string(),
@ -474,11 +492,28 @@ pub fn extract_mikan_bangumis_meta_from_my_bangumi_page(
fansub: Some(fansub_name),
origin_poster_src: origin_poster_src.clone(),
};
yield item;
}
}
}
}
impl StreamTaskRunnerTrait for ExtractMikanBangumisMetaFromMyBangumiTask {
fn run(
context: Arc<dyn AppContextTrait>,
request: &Self::Request,
history: &[Arc<RResult<Self::Item>>],
) -> impl Stream<Item = RResult<Self::Item>> {
let context = context.clone();
extract_mikan_bangumis_meta_from_my_bangumi_page(
context,
request.my_bangumi_page_url.clone(),
request.auth_securcy.clone(),
history,
)
}
}
#[cfg(test)]
mod test {
#![allow(unused_variables)]
@ -486,22 +521,19 @@ mod test {
use futures::{TryStreamExt, pin_mut};
use http::header;
use rstest::{fixture, rstest};
use secrecy::SecretString;
use tracing::Level;
use url::Url;
use zune_image::{codecs::ImageFormat, image::Image};
use super::*;
use crate::{
extract::mikan::{
MikanAuthSecrecy, web_extract::extract_mikan_bangumis_meta_from_my_bangumi_page,
},
test_utils::{mikan::build_testing_mikan_client, tracing::init_testing_tracing},
use crate::test_utils::{
app::UnitTestAppContext, mikan::build_testing_mikan_client,
tracing::try_init_testing_tracing,
};
#[fixture]
fn before_each() {
init_testing_tracing(Level::INFO);
try_init_testing_tracing(Level::INFO);
}
#[rstest]
@ -625,7 +657,11 @@ mod test {
let my_bangumi_page_url = mikan_base_url.join("/Home/MyBangumi")?;
let mikan_client = build_testing_mikan_client(mikan_base_url.clone()).await?;
let context = Arc::new(
UnitTestAppContext::builder()
.mikan(build_testing_mikan_client(mikan_base_url.clone()).await?)
.build(),
);
{
let my_bangumi_without_cookie_mock = mikan_server
@ -636,8 +672,10 @@ mod test {
.await;
let bangumi_metas = extract_mikan_bangumis_meta_from_my_bangumi_page(
&mikan_client,
context.clone(),
my_bangumi_page_url.clone(),
None,
&[],
);
pin_mut!(bangumi_metas);
@ -671,8 +709,8 @@ mod test {
.create_async()
.await;
let mikan_client_with_cookie = mikan_client.fork_with_auth(MikanAuthSecrecy {
cookie: SecretString::from(
let auth_secrecy = Some(MikanAuthSecrecy {
cookie: String::from(
"mikan-announcement=1; .AspNetCore.Antiforgery.abc=abc; \
.AspNetCore.Identity.Application=abc; ",
),
@ -680,11 +718,13 @@ mod test {
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like \
Gecko) Chrome/133.0.0.0 Safari/537.36 Edg/133.0.0.0",
)),
})?;
});
let bangumi_metas = extract_mikan_bangumis_meta_from_my_bangumi_page(
&mikan_client_with_cookie,
context.clone(),
my_bangumi_page_url,
auth_secrecy,
&[],
);
pin_mut!(bangumi_metas);
let bangumi_metas = bangumi_metas.try_collect::<Vec<_>>().await?;

View File

@ -2,12 +2,11 @@ use std::sync::Arc;
use cookie::Cookie;
use reqwest::{ClientBuilder, cookie::Jar};
use secrecy::zeroize::Zeroize;
use url::Url;
use crate::errors::RError;
pub trait HttpClientSecrecyDataTrait: Zeroize {
pub trait HttpClientSecrecyDataTrait {
fn attach_secrecy_to_client(&self, client_builder: ClientBuilder) -> ClientBuilder {
client_builder
}
@ -37,13 +36,6 @@ impl HttpClientCookiesAuth {
}
}
impl Zeroize for HttpClientCookiesAuth {
fn zeroize(&mut self) {
self.cookie_jar = Arc::new(Jar::default());
self.user_agent = None;
}
}
impl HttpClientSecrecyDataTrait for HttpClientCookiesAuth {
fn attach_secrecy_to_client(&self, client_builder: ClientBuilder) -> ClientBuilder {
let mut client_builder = client_builder.cookie_provider(self.cookie_jar.clone());

View File

@ -3,6 +3,7 @@ pub mod filter;
pub mod guard;
pub mod schema_root;
pub mod service;
pub mod subscriptions;
pub mod util;
pub use config::GraphQLConfig;

View File

@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
use super::subscribers::{self, SEED_SUBSCRIBER};
use crate::{
app::AppContext,
app::AppContextTrait,
errors::{RError, RResult},
};
@ -57,8 +57,8 @@ impl Related<super::subscribers::Entity> for Entity {
impl ActiveModelBehavior for ActiveModel {}
impl Model {
pub async fn find_by_pid(ctx: &AppContext, pid: &str) -> RResult<Self> {
let db = &ctx.db;
pub async fn find_by_pid(ctx: &dyn AppContextTrait, pid: &str) -> RResult<Self> {
let db = ctx.db();
let subscriber_auth = Entity::find()
.filter(Column::Pid.eq(pid))
.one(db)
@ -67,8 +67,8 @@ impl Model {
Ok(subscriber_auth)
}
pub async fn create_from_oidc(ctx: &AppContext, sub: String) -> RResult<Self> {
let db = &ctx.db;
pub async fn create_from_oidc(ctx: &dyn AppContextTrait, sub: String) -> RResult<Self> {
let db = ctx.db();
let txn = db.begin().await?;

View File

@ -4,7 +4,7 @@ use sea_orm::{ActiveValue, FromJsonQueryResult, entity::prelude::*, sea_query::O
use serde::{Deserialize, Serialize};
use super::subscription_bangumi;
use crate::{app::AppContext, errors::RResult};
use crate::{app::AppContextTrait, errors::RResult};
#[derive(
Clone, Debug, PartialEq, Eq, Serialize, Deserialize, FromJsonQueryResult, SimpleObject,
@ -113,7 +113,7 @@ pub enum RelatedEntity {
impl Model {
pub async fn get_or_insert_from_mikan<F>(
ctx: &AppContext,
ctx: &dyn AppContextTrait,
subscriber_id: i32,
subscription_id: i32,
mikan_bangumi_id: String,
@ -123,7 +123,7 @@ impl Model {
where
F: AsyncFnOnce(&mut ActiveModel) -> RResult<()>,
{
let db = &ctx.db;
let db = ctx.db();
if let Some(existed) = Entity::find()
.filter(
Column::MikanBangumiId

View File

@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
use super::{bangumi, query::InsertManyReturningExt, subscription_episode};
use crate::{
app::AppContext,
app::AppContextTrait,
errors::RResult,
extract::{
mikan::{MikanEpisodeMeta, build_mikan_episode_homepage},
@ -136,12 +136,12 @@ pub struct MikanEpsiodeCreation {
impl Model {
pub async fn add_episodes(
ctx: &AppContext,
ctx: &dyn AppContextTrait,
subscriber_id: i32,
subscription_id: i32,
creations: impl IntoIterator<Item = MikanEpsiodeCreation>,
) -> RResult<()> {
let db = &ctx.db;
let db = ctx.db();
let new_episode_active_modes = creations
.into_iter()
.map(|cr| ActiveModel::from_mikan_episode_meta(ctx, cr))
@ -189,7 +189,7 @@ impl Model {
impl ActiveModel {
pub fn from_mikan_episode_meta(
ctx: &AppContext,
ctx: &dyn AppContextTrait,
creation: MikanEpsiodeCreation,
) -> color_eyre::eyre::Result<Self> {
let item = creation.episode;
@ -201,7 +201,7 @@ impl ActiveModel {
.ok()
.unwrap_or_default();
let homepage =
build_mikan_episode_homepage(ctx.mikan.base_url().clone(), &item.mikan_episode_id);
build_mikan_episode_homepage(ctx.mikan().base_url().clone(), &item.mikan_episode_id);
Ok(Self {
mikan_episode_id: ActiveValue::Set(Some(item.mikan_episode_id)),

View File

@ -8,3 +8,5 @@ pub mod subscribers;
pub mod subscription_bangumi;
pub mod subscription_episode;
pub mod subscriptions;
pub mod task_stream_item;
pub mod tasks;

View File

@ -4,7 +4,7 @@ use sea_orm::{ActiveValue, FromJsonQueryResult, TransactionTrait, entity::prelud
use serde::{Deserialize, Serialize};
use crate::{
app::AppContext,
app::AppContextTrait,
errors::{RError, RResult},
};
@ -95,13 +95,13 @@ pub struct SubscriberIdParams {
impl ActiveModelBehavior for ActiveModel {}
impl Model {
pub async fn find_seed_subscriber_id(ctx: &AppContext) -> RResult<i32> {
pub async fn find_seed_subscriber_id(ctx: &dyn AppContextTrait) -> RResult<i32> {
let subscriber_auth = crate::models::auth::Model::find_by_pid(ctx, SEED_SUBSCRIBER).await?;
Ok(subscriber_auth.subscriber_id)
}
pub async fn find_by_id(ctx: &AppContext, id: i32) -> RResult<Self> {
let db = &ctx.db;
pub async fn find_by_id(ctx: &dyn AppContextTrait, id: i32) -> RResult<Self> {
let db = ctx.db();
let subscriber = Entity::find_by_id(id)
.one(db)
@ -110,8 +110,8 @@ impl Model {
Ok(subscriber)
}
pub async fn create_root(ctx: &AppContext) -> RResult<Self> {
let db = &ctx.db;
pub async fn create_root(ctx: &dyn AppContextTrait) -> RResult<Self> {
let db = ctx.db();
let txn = db.begin().await?;
let user = ActiveModel {

View File

@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
use super::{bangumi, episodes, query::filter_values_in};
use crate::{
app::AppContext,
app::AppContextTrait,
errors::RResult,
extract::{
mikan::{
@ -179,22 +179,22 @@ impl ActiveModel {
impl Model {
pub async fn add_subscription(
ctx: &AppContext,
ctx: &dyn AppContextTrait,
create_dto: SubscriptionCreateDto,
subscriber_id: i32,
) -> RResult<Self> {
let db = &ctx.db;
let db = ctx.db();
let subscription = ActiveModel::from_create_dto(create_dto, subscriber_id);
Ok(subscription.insert(db).await?)
}
pub async fn toggle_with_ids(
ctx: &AppContext,
ctx: &dyn AppContextTrait,
ids: impl Iterator<Item = i32>,
enabled: bool,
) -> RResult<()> {
let db = &ctx.db;
let db = ctx.db();
Entity::update_many()
.col_expr(Column::Enabled, Expr::value(enabled))
.filter(Column::Id.is_in(ids))
@ -203,8 +203,11 @@ impl Model {
Ok(())
}
pub async fn delete_with_ids(ctx: &AppContext, ids: impl Iterator<Item = i32>) -> RResult<()> {
let db = &ctx.db;
pub async fn delete_with_ids(
ctx: &dyn AppContextTrait,
ids: impl Iterator<Item = i32>,
) -> RResult<()> {
let db = ctx.db();
Entity::delete_many()
.filter(Column::Id.is_in(ids))
.exec(db)
@ -212,16 +215,16 @@ impl Model {
Ok(())
}
pub async fn pull_subscription(&self, ctx: &AppContext) -> RResult<()> {
pub async fn pull_subscription(&self, ctx: &dyn AppContextTrait) -> RResult<()> {
match &self.category {
SubscriptionCategory::Mikan => {
let mikan_client = &ctx.mikan;
let mikan_client = ctx.mikan();
let channel =
extract_mikan_rss_channel_from_rss_link(mikan_client, &self.source_url).await?;
let items = channel.into_items();
let db = &ctx.db;
let db = ctx.db();
let items = items.into_iter().collect_vec();
let mut stmt = filter_values_in(
@ -266,7 +269,7 @@ impl Model {
for ((mikan_bangumi_id, mikan_fansub_id), new_ep_metas) in new_mikan_bangumi_groups
{
let mikan_base_url = ctx.mikan.base_url();
let mikan_base_url = ctx.mikan().base_url();
let bgm_homepage = build_mikan_bangumi_homepage(
mikan_base_url.clone(),
&mikan_bangumi_id,

View File

@ -0,0 +1,62 @@
use async_trait::async_trait;
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(
Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize,
)]
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "task_status")]
#[serde(rename_all = "snake_case")]
pub enum TaskStatus {
#[sea_orm(string_value = "r")]
Running,
#[sea_orm(string_value = "s")]
Success,
#[sea_orm(string_value = "f")]
Failed,
}
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "tasks")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub task_id: i32,
pub subscriber_id: i32,
pub item: serde_json::Value,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::subscribers::Entity",
from = "Column::SubscriberId",
to = "super::subscribers::Column::Id",
on_update = "Cascade",
on_delete = "Cascade"
)]
Subscriber,
#[sea_orm(
belongs_to = "super::tasks::Entity",
from = "Column::TaskId",
to = "super::tasks::Column::Id",
on_update = "Cascade",
on_delete = "Cascade"
)]
Task,
}
impl Related<super::subscribers::Entity> for Entity {
fn to() -> RelationDef {
Relation::Subscriber.def()
}
}
impl Related<super::tasks::Entity> for Entity {
fn to() -> RelationDef {
Relation::Task.def()
}
}
#[async_trait]
impl ActiveModelBehavior for ActiveModel {}

View File

@ -0,0 +1,95 @@
use async_trait::async_trait;
use sea_orm::{QuerySelect, entity::prelude::*};
use serde::{Deserialize, Serialize};
use crate::{app::AppContextTrait, errors::RResult};
#[derive(
Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize,
)]
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "task_status")]
#[serde(rename_all = "snake_case")]
pub enum TaskStatus {
#[sea_orm(string_value = "p")]
Pending,
#[sea_orm(string_value = "r")]
Running,
#[sea_orm(string_value = "s")]
Success,
#[sea_orm(string_value = "f")]
Failed,
}
#[derive(
Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize,
)]
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "task_status")]
#[serde(rename_all = "snake_case")]
pub enum TaskMode {
#[sea_orm(string_value = "stream")]
Stream,
#[sea_orm(string_value = "future")]
Future,
}
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "tasks")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub subscriber_id: i32,
pub task_mode: TaskMode,
pub task_status: TaskStatus,
pub task_type: String,
pub state_data: serde_json::Value,
pub request_data: serde_json::Value,
pub error_data: serde_json::Value,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(has_many = "super::task_stream_item::Entity")]
StreamItem,
#[sea_orm(
belongs_to = "super::subscribers::Entity",
from = "Column::SubscriberId",
to = "super::subscribers::Column::Id",
on_update = "Cascade",
on_delete = "Cascade"
)]
Subscriber,
}
impl Related<super::subscribers::Entity> for Entity {
fn to() -> RelationDef {
Relation::Subscriber.def()
}
}
impl Related<super::task_stream_item::Entity> for Entity {
fn to() -> RelationDef {
Relation::StreamItem.def()
}
}
impl Model {
pub async fn find_stream_task_by_id(
ctx: &dyn AppContextTrait,
task_id: i32,
) -> RResult<Option<(Model, Vec<super::task_stream_item::Model>)>> {
let db = ctx.db();
let res = Entity::find()
.filter(Column::Id.eq(task_id))
.filter(Column::TaskMode.eq(TaskMode::Stream))
.find_with_related(super::task_stream_item::Entity)
.limit(1)
.all(db)
.await?
.pop();
Ok(res)
}
}
#[async_trait]
impl ActiveModelBehavior for ActiveModel {}

View File

@ -1,16 +1,277 @@
use std::borrow::Cow;
use std::{borrow::Cow, sync::Arc};
use async_trait::async_trait;
use async_stream::stream;
use futures::{Stream, StreamExt, pin_mut};
use serde::{Serialize, de::DeserializeOwned};
use tokio::sync::{RwLock, mpsc};
use crate::{app::AppContext, errors::RResult};
use crate::{
app::AppContextTrait,
errors::{RError, RResult},
models,
};
pub struct TaskVars {}
#[async_trait]
pub trait Task: Send + Sync {
fn task_name() -> Cow<'static, str>;
fn task_id(&self) -> &str;
async fn run(&self, app_context: &AppContext, vars: &TaskVars) -> RResult<()>;
pub struct TaskMeta {
pub subscriber_id: i32,
pub task_id: i32,
pub task_kind: Cow<'static, str>,
}
pub struct ReplayChannel<T: Send + Sync + Clone + 'static> {
sender: mpsc::UnboundedSender<T>,
channels: Arc<RwLock<Vec<mpsc::UnboundedSender<T>>>>,
buffer: Arc<RwLock<Vec<T>>>,
}
impl<T: Send + Sync + Clone + 'static> ReplayChannel<T> {
pub fn new(history: Vec<T>) -> Self {
let (tx, mut rx) = mpsc::unbounded_channel::<T>();
let channels = Arc::new(RwLock::new(Vec::<mpsc::UnboundedSender<T>>::new()));
let buffer = Arc::new(RwLock::new(history));
{
let channels = channels.clone();
let buffer = buffer.clone();
tokio::spawn(async move {
loop {
match rx.recv().await {
Some(value) => {
let mut w = buffer.write().await;
let senders = channels.read().await;
for s in senders.iter() {
if !s.is_closed() {
if let Err(err) = s.send(value.clone()) {
tracing::error!(err = %err, "replay-channel broadcast to other subscribers error");
}
}
}
w.push(value);
}
None => {
drop(rx);
let mut cs = channels.write().await;
cs.clear();
break;
}
}
}
});
}
Self {
sender: tx,
channels,
buffer,
}
}
pub fn sender(&self) -> &mpsc::UnboundedSender<T> {
&self.sender
}
pub async fn receiver(&self) -> mpsc::UnboundedReceiver<T> {
let (tx, rx) = mpsc::unbounded_channel();
let items = self.buffer.read().await;
for item in items.iter() {
if let Err(err) = tx.send(item.clone()) {
tracing::error!(err = %err, "replay-channel send replay value to other subscribers error");
}
}
if !self.sender.is_closed() {
let mut sw = self.channels.write().await;
sw.push(tx);
}
rx
}
pub async fn close(&self) {
let mut senders = self.channels.write().await;
senders.clear();
}
}
pub trait StreamTaskCoreTrait: Sized {
type Request: Serialize + DeserializeOwned;
type Item: Serialize + DeserializeOwned;
fn task_id(&self) -> i32;
fn task_kind(&self) -> &str;
fn new(meta: TaskMeta, request: Self::Request) -> Self;
fn request(&self) -> &Self::Request;
}
pub trait StreamTaskReplayLayoutTrait: StreamTaskCoreTrait {
fn history(&self) -> &[Arc<RResult<Self::Item>>];
fn resume_from_model(
task: models::tasks::Model,
stream_items: Vec<models::task_stream_item::Model>,
) -> RResult<Self>;
fn running_receiver(
&self,
) -> impl Future<Output = Option<mpsc::UnboundedReceiver<Arc<RResult<Self::Item>>>>>;
#[allow(clippy::type_complexity)]
fn init_receiver(
&self,
) -> impl Future<
Output = (
mpsc::UnboundedSender<Arc<RResult<Self::Item>>>,
mpsc::UnboundedReceiver<Arc<RResult<Self::Item>>>,
),
>;
fn serialize_request(request: Self::Request) -> RResult<serde_json::Value> {
serde_json::to_value(request).map_err(RError::from)
}
fn serialize_item(item: RResult<Self::Item>) -> RResult<serde_json::Value> {
serde_json::to_value(item).map_err(RError::from)
}
fn deserialize_request(request: serde_json::Value) -> RResult<Self::Request> {
serde_json::from_value(request).map_err(RError::from)
}
fn deserialize_item(item: serde_json::Value) -> RResult<RResult<Self::Item>> {
serde_json::from_value(item).map_err(RError::from)
}
}
pub trait StreamTaskRunnerTrait: StreamTaskCoreTrait {
fn run(
context: Arc<dyn AppContextTrait>,
request: &Self::Request,
history: &[Arc<RResult<Self::Item>>],
) -> impl Stream<Item = RResult<Self::Item>>;
}
pub trait StreamTaskReplayRunnerTrait: StreamTaskRunnerTrait + StreamTaskReplayLayoutTrait {
fn run_shared(
&self,
context: Arc<dyn AppContextTrait>,
) -> impl Stream<Item = Arc<RResult<Self::Item>>> {
stream! {
if let Some(mut receiver) = self.running_receiver().await {
while let Some(item) = receiver.recv().await {
yield item
}
} else {
let (tx, _) = self.init_receiver().await;
let stream = Self::run(context, self.request(), self.history());
pin_mut!(stream);
while let Some(item) = stream.next().await {
let item = Arc::new(item);
if let Err(err) = tx.send(item.clone()) {
tracing::error!(task_id = self.task_id(), task_kind = self.task_kind(), err = %err, "run shared send error");
}
yield item
}
};
}
}
}
pub struct StandardStreamTaskReplayLayout<Request, Item>
where
Request: Serialize + DeserializeOwned,
Item: Serialize + DeserializeOwned + Sync + Send + 'static,
{
pub meta: TaskMeta,
pub request: Request,
pub history: Vec<Arc<RResult<Item>>>,
#[allow(clippy::type_complexity)]
pub channel: Arc<RwLock<Option<ReplayChannel<Arc<RResult<Item>>>>>>,
}
impl<Request, Item> StreamTaskCoreTrait for StandardStreamTaskReplayLayout<Request, Item>
where
Request: Serialize + DeserializeOwned,
Item: Serialize + DeserializeOwned + Sync + Send + 'static,
{
type Request = Request;
type Item = Item;
fn task_id(&self) -> i32 {
self.meta.task_id
}
fn request(&self) -> &Self::Request {
&self.request
}
fn task_kind(&self) -> &str {
&self.meta.task_kind
}
fn new(meta: TaskMeta, request: Self::Request) -> Self {
Self {
meta,
request,
history: vec![],
channel: Arc::new(RwLock::new(None)),
}
}
}
impl<Request, Item> StreamTaskReplayLayoutTrait for StandardStreamTaskReplayLayout<Request, Item>
where
Request: Serialize + DeserializeOwned,
Item: Serialize + DeserializeOwned + Sync + Send + 'static,
{
fn history(&self) -> &[Arc<RResult<Self::Item>>] {
&self.history
}
fn resume_from_model(
task: models::tasks::Model,
stream_items: Vec<models::task_stream_item::Model>,
) -> RResult<Self> {
Ok(Self {
meta: TaskMeta {
task_id: task.id,
subscriber_id: task.subscriber_id,
task_kind: Cow::Owned(task.task_type),
},
request: Self::deserialize_request(task.request_data)?,
history: stream_items
.into_iter()
.map(|m| Self::deserialize_item(m.item).map(Arc::new))
.collect::<RResult<Vec<_>>>()?,
channel: Arc::new(RwLock::new(None)),
})
}
async fn running_receiver(&self) -> Option<mpsc::UnboundedReceiver<Arc<RResult<Self::Item>>>> {
if let Some(channel) = self.channel.read().await.as_ref() {
Some(channel.receiver().await)
} else {
None
}
}
async fn init_receiver(
&self,
) -> (
mpsc::UnboundedSender<Arc<RResult<Self::Item>>>,
mpsc::UnboundedReceiver<Arc<RResult<Self::Item>>>,
) {
let channel = ReplayChannel::new(self.history.clone());
let rx = channel.receiver().await;
let sender = channel.sender().clone();
{
{
let mut w = self.channel.write().await;
*w = Some(channel);
}
}
(sender, rx)
}
}

View File

@ -1,49 +0,0 @@
use std::borrow::Cow;
use futures::{TryStreamExt, pin_mut};
use super::core::{Task, TaskVars};
use crate::{
app::AppContext,
errors::RResult,
extract::mikan::{
MikanAuthSecrecy, web_extract::extract_mikan_bangumis_meta_from_my_bangumi_page,
},
};
#[derive(Debug)]
pub struct CreateMikanRSSFromMyBangumiTask {
pub subscriber_id: i32,
pub task_id: String,
pub auth_secrecy: MikanAuthSecrecy,
}
#[async_trait::async_trait]
impl Task for CreateMikanRSSFromMyBangumiTask {
fn task_name() -> Cow<'static, str> {
Cow::Borrowed("create-mikan-rss-from-my-bangumi")
}
fn task_id(&self) -> &str {
&self.task_id
}
async fn run(&self, app_context: &AppContext, _vars: &TaskVars) -> RResult<()> {
let mikan_client = app_context
.mikan
.fork_with_auth(self.auth_secrecy.clone())?;
{
let bangumi_metas = extract_mikan_bangumis_meta_from_my_bangumi_page(
&mikan_client,
mikan_client.base_url().join("/Home/MyBangumi")?,
);
pin_mut!(bangumi_metas);
let _bangumi_metas = bangumi_metas.try_collect::<Vec<_>>().await?;
}
Ok(())
}
}

View File

@ -0,0 +1,38 @@
// use std::borrow::Cow;
// use futures::{TryStreamExt, pin_mut};
// use crate::{
// app::AppContextTrait,
// errors::RResult,
// extract::mikan::{
// MikanAuthSecrecy,
// web_extract::extract_mikan_bangumis_meta_from_my_bangumi_page, },
// tasks::core::{StreamTaskTrait, TaskVars},
// };
// #[derive(Debug)]
// pub struct CreateMikanRSSFromMyBangumiTask {
// pub subscriber_id: i32,
// pub task_id: String,
// pub auth_secrecy: MikanAuthSecrecy,
// }
// async fn run(app_context: &dyn AppContextTrait, _vars: &TaskVars) ->
// RResult<()> { let mikan_client = app_context
// .mikan
// .fork_with_auth(todo!().auth_secrecy.clone())?;
// {
// let bangumi_metas = extract_mikan_bangumis_meta_from_my_bangumi_page(
// &mikan_client,
// mikan_client.base_url().join("/Home/MyBangumi")?,
// );
// pin_mut!(bangumi_metas);
// let _bangumi_metas = bangumi_metas.try_collect::<Vec<_>>().await?;
// }
// Ok(())
// }

View File

@ -0,0 +1 @@
pub mod create_mikan_bangumi_subscriptions_from_my_bangumi_page;

View File

@ -1,2 +1,4 @@
pub mod core;
pub mod create_mikan_bangumi_subscriptions_from_my_bangumi_page;
pub mod mikan;
pub mod service;
pub mod registry;

View File

@ -0,0 +1 @@

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,62 @@
use typed_builder::TypedBuilder;
use crate::app::AppContextTrait;
#[derive(TypedBuilder)]
#[builder(field_defaults(default, setter(strip_option)))]
pub struct UnitTestAppContext {
logger: Option<crate::logger::LoggerService>,
db: Option<crate::database::DatabaseService>,
config: Option<crate::app::AppConfig>,
cache: Option<crate::cache::CacheService>,
mikan: Option<crate::extract::mikan::MikanClient>,
auth: Option<crate::auth::AuthService>,
graphql: Option<crate::graphql::GraphQLService>,
storage: Option<crate::storage::StorageService>,
#[builder(default = Some(String::from(env!("CARGO_MANIFEST_DIR"))))]
working_dir: Option<String>,
#[builder(default = crate::app::Environment::Testing, setter(!strip_option))]
environment: crate::app::Environment,
}
impl AppContextTrait for UnitTestAppContext {
fn logger(&self) -> &crate::logger::LoggerService {
self.logger.as_ref().expect("should set logger")
}
fn db(&self) -> &crate::database::DatabaseService {
self.db.as_ref().expect("should set db")
}
fn config(&self) -> &crate::app::AppConfig {
self.config.as_ref().expect("should set config")
}
fn cache(&self) -> &crate::cache::CacheService {
self.cache.as_ref().expect("should set cache")
}
fn mikan(&self) -> &crate::extract::mikan::MikanClient {
self.mikan.as_ref().expect("should set mikan")
}
fn auth(&self) -> &crate::auth::AuthService {
self.auth.as_ref().expect("should set auth")
}
fn graphql(&self) -> &crate::graphql::GraphQLService {
self.graphql.as_ref().expect("should set graphql")
}
fn storage(&self) -> &crate::storage::StorageService {
self.storage.as_ref().expect("should set storage")
}
fn environment(&self) -> &crate::app::Environment {
&self.environment
}
fn working_dir(&self) -> &String {
self.working_dir.as_ref().expect("should set working_dir")
}
}

View File

@ -1,3 +1,4 @@
pub mod app;
pub mod fetch;
pub mod mikan;
#[cfg(feature = "testcontainers")]

View File

@ -1,10 +1,10 @@
use tracing::Level;
use tracing_subscriber::EnvFilter;
pub fn init_testing_tracing(level: Level) {
pub fn try_init_testing_tracing(level: Level) {
let crate_name = env!("CARGO_PKG_NAME");
let level = level.as_str().to_lowercase();
let filter = EnvFilter::new(format!("{}[]={}", crate_name, level))
.add_directive(format!("mockito[]={}", level).parse().unwrap());
tracing_subscriber::fmt().with_env_filter(filter).init();
let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
}

View File

@ -2,19 +2,23 @@ use std::{borrow::Cow, sync::Arc};
use axum::Router;
use crate::app::AppContext;
use crate::app::AppContextTrait;
pub trait ControllerTrait: Sized {
fn apply_to(self, router: Router<Arc<AppContext>>) -> Router<Arc<AppContext>>;
fn apply_to(self, router: Router<Arc<dyn AppContextTrait>>)
-> Router<Arc<dyn AppContextTrait>>;
}
pub struct PrefixController {
prefix: Cow<'static, str>,
router: Router<Arc<AppContext>>,
router: Router<Arc<dyn AppContextTrait>>,
}
impl PrefixController {
pub fn new(prefix: impl Into<Cow<'static, str>>, router: Router<Arc<AppContext>>) -> Self {
pub fn new(
prefix: impl Into<Cow<'static, str>>,
router: Router<Arc<dyn AppContextTrait>>,
) -> Self {
Self {
prefix: prefix.into(),
router,
@ -23,7 +27,10 @@ impl PrefixController {
}
impl ControllerTrait for PrefixController {
fn apply_to(self, router: Router<Arc<AppContext>>) -> Router<Arc<AppContext>> {
fn apply_to(
self,
router: Router<Arc<dyn AppContextTrait>>,
) -> Router<Arc<dyn AppContextTrait>> {
router.nest(&self.prefix, self.router)
}
}
@ -35,14 +42,17 @@ pub enum Controller {
impl Controller {
pub fn from_prefix(
prefix: impl Into<Cow<'static, str>>,
router: Router<Arc<AppContext>>,
router: Router<Arc<dyn AppContextTrait>>,
) -> Self {
Self::Prefix(PrefixController::new(prefix, router))
}
}
impl ControllerTrait for Controller {
fn apply_to(self, router: Router<Arc<AppContext>>) -> Router<Arc<AppContext>> {
fn apply_to(
self,
router: Router<Arc<dyn AppContextTrait>>,
) -> Router<Arc<dyn AppContextTrait>> {
match self {
Self::Prefix(p) => p.apply_to(router),
}

View File

@ -5,7 +5,7 @@ use axum::{Extension, Router, extract::State, middleware::from_fn_with_state, ro
use super::core::Controller;
use crate::{
app::AppContext,
app::AppContextTrait,
auth::{AuthUserInfo, header_www_authenticate_middleware},
errors::RResult,
};
@ -13,11 +13,11 @@ use crate::{
pub const CONTROLLER_PREFIX: &str = "/api/graphql";
async fn graphql_handler(
State(ctx): State<Arc<AppContext>>,
State(ctx): State<Arc<dyn AppContextTrait>>,
Extension(auth_user_info): Extension<AuthUserInfo>,
req: GraphQLRequest,
) -> GraphQLResponse {
let graphql_service = &ctx.graphql;
let graphql_service = ctx.graphql();
let mut req = req.into_inner();
req = req.data(auth_user_info);
@ -25,8 +25,8 @@ async fn graphql_handler(
graphql_service.schema.execute(req).await.into()
}
pub async fn create(ctx: Arc<AppContext>) -> RResult<Controller> {
let router = Router::<Arc<AppContext>>::new()
pub async fn create(ctx: Arc<dyn AppContextTrait>) -> RResult<Controller> {
let router = Router::<Arc<dyn AppContextTrait>>::new()
.route("/", post(graphql_handler))
.layer(from_fn_with_state(ctx, header_www_authenticate_middleware));
Ok(Controller::from_prefix(CONTROLLER_PREFIX, router))

View File

@ -9,7 +9,7 @@ use axum::{
use super::core::Controller;
use crate::{
app::AppContext,
app::AppContextTrait,
auth::{
AuthError, AuthService, AuthServiceTrait,
oidc::{OidcAuthCallbackPayload, OidcAuthCallbackQuery, OidcAuthRequest},
@ -22,10 +22,10 @@ use crate::{
pub const CONTROLLER_PREFIX: &str = "/api/oidc";
async fn oidc_callback(
State(ctx): State<Arc<AppContext>>,
State(ctx): State<Arc<dyn AppContextTrait>>,
Query(query): Query<OidcAuthCallbackQuery>,
) -> Result<Json<OidcAuthCallbackPayload>, AuthError> {
let auth_service = &ctx.auth;
let auth_service = ctx.auth();
if let AuthService::Oidc(oidc_auth_service) = auth_service {
let response = oidc_auth_service
.extract_authorization_request_callback(query)
@ -40,10 +40,10 @@ async fn oidc_callback(
}
async fn oidc_auth(
State(ctx): State<Arc<AppContext>>,
State(ctx): State<Arc<dyn AppContextTrait>>,
parts: Parts,
) -> Result<Json<OidcAuthRequest>, AuthError> {
let auth_service = &ctx.auth;
let auth_service = ctx.auth();
if let AuthService::Oidc(oidc_auth_service) = auth_service {
let mut redirect_uri = ForwardedRelatedInfo::from_request_parts(&parts)
.resolved_origin()
@ -70,8 +70,8 @@ async fn oidc_auth(
}
}
pub async fn create(_context: Arc<AppContext>) -> RResult<Controller> {
let router = Router::<Arc<AppContext>>::new()
pub async fn create(_context: Arc<dyn AppContextTrait>) -> RResult<Controller> {
let router = Router::<Arc<dyn AppContextTrait>>::new()
.route("/auth", get(oidc_auth))
.route("/callback", get(oidc_callback));

View File

@ -12,7 +12,7 @@ use http::StatusCode;
use serde::{Deserialize, Serialize};
use tower_http::catch_panic::CatchPanicLayer;
use crate::{app::AppContext, errors::RResult, web::middleware::MiddlewareLayer};
use crate::{app::AppContextTrait, errors::RResult, web::middleware::MiddlewareLayer};
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct CatchPanic {
@ -52,7 +52,10 @@ impl MiddlewareLayer for CatchPanic {
}
/// Applies the Catch Panic middleware layer to the Axum router.
fn apply(&self, app: Router<Arc<AppContext>>) -> RResult<Router<Arc<AppContext>>> {
fn apply(
&self,
app: Router<Arc<dyn AppContextTrait>>,
) -> RResult<Router<Arc<dyn AppContextTrait>>> {
Ok(app.layer(CatchPanicLayer::custom(handle_panic)))
}
}

View File

@ -11,7 +11,7 @@ use axum::Router;
use serde::{Deserialize, Serialize};
use tower_http::compression::CompressionLayer;
use crate::{app::AppContext, errors::RResult, web::middleware::MiddlewareLayer};
use crate::{app::AppContextTrait, errors::RResult, web::middleware::MiddlewareLayer};
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Compression {
@ -35,7 +35,10 @@ impl MiddlewareLayer for Compression {
}
/// Applies the Compression middleware layer to the Axum router.
fn apply(&self, app: Router<Arc<AppContext>>) -> RResult<Router<Arc<AppContext>>> {
fn apply(
&self,
app: Router<Arc<dyn AppContextTrait>>,
) -> RResult<Router<Arc<dyn AppContextTrait>>> {
Ok(app.layer(CompressionLayer::new()))
}
}

View File

@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize};
use serde_json::json;
use tower_http::cors::{self, Any};
use crate::{app::AppContext, web::middleware::MiddlewareLayer, errors::RResult};
use crate::{app::AppContextTrait, web::middleware::MiddlewareLayer, errors::RResult};
/// CORS middleware configuration
#[derive(Debug, Clone, Deserialize, Serialize)]
@ -157,7 +157,7 @@ impl MiddlewareLayer for Cors {
}
/// Applies the CORS middleware layer to the Axum router.
fn apply(&self, app: Router<Arc<AppContext>>) -> RResult<Router<Arc<AppContext>>> {
fn apply(&self, app: Router<Arc<dyn AppContextTrait>>) -> RResult<Router<Arc<dyn AppContextTrait>>> {
Ok(app.layer(self.cors()?))
}
}

View File

@ -25,7 +25,7 @@ use futures_util::future::BoxFuture;
use serde::{Deserialize, Serialize};
use tower::{Layer, Service};
use crate::{app::AppContext, errors::RResult, web::middleware::MiddlewareLayer};
use crate::{app::AppContextTrait, errors::RResult, web::middleware::MiddlewareLayer};
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Etag {
@ -49,7 +49,10 @@ impl MiddlewareLayer for Etag {
}
/// Applies the `ETag` middleware to the application router.
fn apply(&self, app: Router<Arc<AppContext>>) -> RResult<Router<Arc<AppContext>>> {
fn apply(
&self,
app: Router<Arc<dyn AppContextTrait>>,
) -> RResult<Router<Arc<dyn AppContextTrait>>> {
Ok(app.layer(EtagLayer))
}
}

View File

@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize};
use tower_http::{add_extension::AddExtensionLayer, trace::TraceLayer};
use crate::{
app::{AppContext, Environment},
app::{AppContextTrait, Environment},
errors::RResult,
web::middleware::{MiddlewareLayer, request_id::LocoRequestId},
};
@ -35,10 +35,10 @@ pub struct Middleware {
/// Creates a new instance of [`Middleware`] by cloning the [`Config`]
/// configuration.
#[must_use]
pub fn new(config: &Config, context: Arc<AppContext>) -> Middleware {
pub fn new(config: &Config, context: Arc<dyn AppContextTrait>) -> Middleware {
Middleware {
config: config.clone(),
environment: context.environment.clone(),
environment: context.environment().clone(),
}
}
@ -67,7 +67,10 @@ impl MiddlewareLayer for Middleware {
/// The `TraceLayer` is customized with `make_span_with` to extract
/// request-specific details like method, URI, version, user agent, and
/// request ID, then create a tracing span for the request.
fn apply(&self, app: Router<Arc<AppContext>>) -> RResult<Router<Arc<AppContext>>> {
fn apply(
&self,
app: Router<Arc<dyn AppContextTrait>>,
) -> RResult<Router<Arc<dyn AppContextTrait>>> {
Ok(app
.layer(
TraceLayer::new_for_http().make_span_with(|request: &http::Request<_>| {

View File

@ -14,7 +14,7 @@ use std::sync::Arc;
use axum::Router;
use serde::{Deserialize, Serialize};
use crate::{app::AppContext, errors::RResult};
use crate::{app::AppContextTrait, errors::RResult};
/// Trait representing the behavior of middleware components in the application.
/// When implementing a new middleware, make sure to go over this checklist:
@ -52,14 +52,17 @@ pub trait MiddlewareLayer {
/// # Errors
///
/// If there is an issue when adding the middleware to the router.
fn apply(&self, app: Router<Arc<AppContext>>) -> RResult<Router<Arc<AppContext>>>;
fn apply(
&self,
app: Router<Arc<dyn AppContextTrait>>,
) -> RResult<Router<Arc<dyn AppContextTrait>>>;
}
#[allow(clippy::unnecessary_lazy_evaluations)]
#[must_use]
pub fn default_middleware_stack(ctx: Arc<AppContext>) -> Vec<Box<dyn MiddlewareLayer>> {
pub fn default_middleware_stack(ctx: Arc<dyn AppContextTrait>) -> Vec<Box<dyn MiddlewareLayer>> {
// Shortened reference to middlewares
let middlewares = &ctx.config.server.middlewares;
let middlewares = &ctx.config().server.middlewares;
vec![
// CORS middleware with a default if none

View File

@ -31,7 +31,7 @@ use tower::{Layer, Service};
use tracing::error;
use crate::{
app::AppContext,
app::AppContextTrait,
errors::{RError, RResult},
web::middleware::MiddlewareLayer,
};
@ -123,7 +123,10 @@ impl MiddlewareLayer for RemoteIpMiddleware {
}
/// Applies the Remote IP middleware to the given Axum router.
fn apply(&self, app: Router<Arc<AppContext>>) -> RResult<Router<Arc<AppContext>>> {
fn apply(
&self,
app: Router<Arc<dyn AppContextTrait>>,
) -> RResult<Router<Arc<dyn AppContextTrait>>> {
Ok(app.layer(RemoteIPLayer::new(self)?))
}
}

View File

@ -11,7 +11,7 @@ use regex::Regex;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::{web::middleware::MiddlewareLayer, app::AppContext, errors::RResult};
use crate::{app::AppContextTrait, errors::RResult, web::middleware::MiddlewareLayer};
const X_REQUEST_ID: &str = "x-request-id";
const MAX_LEN: usize = 255;
@ -52,7 +52,10 @@ impl MiddlewareLayer for RequestId {
///
/// # Errors
/// This function returns an error if the middleware cannot be applied.
fn apply(&self, app: Router<Arc<AppContext>>) -> RResult<Router<Arc<AppContext>>> {
fn apply(
&self,
app: Router<Arc<dyn AppContextTrait>>,
) -> RResult<Router<Arc<dyn AppContextTrait>>> {
Ok(app.layer(axum::middleware::from_fn(request_id_middleware)))
}
}

View File

@ -21,7 +21,7 @@ use serde_json::{self, json};
use tower::{Layer, Service};
use crate::{
app::AppContext,
app::AppContextTrait,
web::middleware::MiddlewareLayer,
errors::{RError, RResult},
};
@ -115,7 +115,7 @@ impl MiddlewareLayer for SecureHeader {
}
/// Applies the secure headers layer to the application router
fn apply(&self, app: Router<Arc<AppContext>>) -> RResult<Router<Arc<AppContext>>> {
fn apply(&self, app: Router<Arc<dyn AppContextTrait>>) -> RResult<Router<Arc<dyn AppContextTrait>>> {
Ok(app.layer(SecureHeaders::new(self)?))
}
}

View File

@ -16,7 +16,7 @@ use serde::{Deserialize, Serialize};
use serde_json::json;
use tower_http::timeout::TimeoutLayer;
use crate::{app::AppContext, errors::RResult, web::middleware::MiddlewareLayer};
use crate::{app::AppContextTrait, errors::RResult, web::middleware::MiddlewareLayer};
/// Timeout middleware configuration
#[derive(Debug, Clone, Deserialize, Serialize)]
@ -58,7 +58,10 @@ impl MiddlewareLayer for TimeOut {
/// This method wraps the provided [`AXRouter`] in a [`TimeoutLayer`],
/// ensuring that requests exceeding the specified timeout duration will
/// be interrupted.
fn apply(&self, app: Router<Arc<AppContext>>) -> RResult<Router<Arc<AppContext>>> {
fn apply(
&self,
app: Router<Arc<dyn AppContextTrait>>,
) -> RResult<Router<Arc<dyn AppContextTrait>>> {
Ok(app.layer(TimeoutLayer::new(Duration::from_millis(self.timeout))))
}
}