feature: rewrite season subscription extractor

This commit is contained in:
2025-05-02 02:23:23 +08:00
parent 4301f1dbab
commit dbded94324
51 changed files with 8181 additions and 6035 deletions

View File

@@ -1,5 +1,3 @@
use std::sync::Arc;
use clap::{Parser, command};
use super::{AppContext, core::App, env::Environment};
@@ -83,9 +81,8 @@ impl AppBuilder {
)
.await?;
let app_context = Arc::new(
AppContext::new(self.environment.clone(), config, self.working_dir.clone()).await?,
);
let app_context =
AppContext::new(self.environment.clone(), config, self.working_dir.clone()).await?;
Ok(App {
context: app_context,

View File

@@ -16,3 +16,7 @@ depth_limit = inf
complexity_limit = inf
[cache]
[crypto]
[task]

View File

@@ -9,9 +9,9 @@ use serde::{Deserialize, Serialize};
use super::env::Environment;
use crate::{
auth::AuthConfig, cache::CacheConfig, database::DatabaseConfig, errors::RecorderResult,
extract::mikan::MikanConfig, graphql::GraphQLConfig, logger::LoggerConfig,
storage::StorageConfig, web::WebServerConfig,
auth::AuthConfig, cache::CacheConfig, crypto::CryptoConfig, database::DatabaseConfig,
errors::RecorderResult, extract::mikan::MikanConfig, graphql::GraphQLConfig,
logger::LoggerConfig, storage::StorageConfig, tasks::TaskConfig, web::WebServerConfig,
};
const DEFAULT_CONFIG_MIXIN: &str = include_str!("./default_mixin.toml");
@@ -24,9 +24,11 @@ pub struct AppConfig {
pub auth: AuthConfig,
pub storage: StorageConfig,
pub mikan: MikanConfig,
pub crypto: CryptoConfig,
pub graphql: GraphQLConfig,
pub logger: LoggerConfig,
pub database: DatabaseConfig,
pub tasks: TaskConfig,
}
impl AppConfig {

View File

@@ -1,11 +1,15 @@
use std::{fmt::Debug, sync::Arc};
use tokio::sync::OnceCell;
use super::{Environment, config::AppConfig};
use crate::{
auth::AuthService, cache::CacheService, database::DatabaseService, errors::RecorderResult,
extract::mikan::MikanClient, graphql::GraphQLService, logger::LoggerService,
storage::StorageService,
auth::AuthService, cache::CacheService, crypto::CryptoService, database::DatabaseService,
errors::RecorderResult, extract::mikan::MikanClient, graphql::GraphQLService,
logger::LoggerService, storage::StorageService, tasks::TaskService,
};
pub trait AppContextTrait: Send + Sync {
pub trait AppContextTrait: Send + Sync + Debug {
fn logger(&self) -> &LoggerService;
fn db(&self) -> &DatabaseService;
fn config(&self) -> &AppConfig;
@@ -16,6 +20,8 @@ pub trait AppContextTrait: Send + Sync {
fn storage(&self) -> &StorageService;
fn working_dir(&self) -> &String;
fn environment(&self) -> &Environment;
fn crypto(&self) -> &CryptoService;
fn task(&self) -> &TaskService;
}
pub struct AppContext {
@@ -27,8 +33,10 @@ pub struct AppContext {
auth: AuthService,
graphql: GraphQLService,
storage: StorageService,
crypto: CryptoService,
working_dir: String,
environment: Environment,
task: OnceCell<TaskService>,
}
impl AppContext {
@@ -36,7 +44,7 @@ impl AppContext {
environment: Environment,
config: AppConfig,
working_dir: impl ToString,
) -> RecorderResult<Self> {
) -> RecorderResult<Arc<Self>> {
let config_cloned = config.clone();
let logger = LoggerService::from_config(config.logger).await?;
@@ -45,9 +53,10 @@ impl AppContext {
let storage = StorageService::from_config(config.storage).await?;
let auth = AuthService::from_conf(config.auth).await?;
let mikan = MikanClient::from_config(config.mikan).await?;
let crypto = CryptoService::from_config(config.crypto).await?;
let graphql = GraphQLService::from_config_and_database(config.graphql, db.clone()).await?;
Ok(AppContext {
let ctx = Arc::new(AppContext {
config: config_cloned,
environment,
logger,
@@ -58,9 +67,26 @@ impl AppContext {
mikan,
working_dir: working_dir.to_string(),
graphql,
})
crypto,
task: OnceCell::new(),
});
ctx.task
.get_or_try_init(async || {
TaskService::from_config_and_ctx(config.tasks, ctx.clone()).await
})
.await?;
Ok(ctx)
}
}
impl Debug for AppContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "AppContext")
}
}
impl AppContextTrait for AppContext {
fn logger(&self) -> &LoggerService {
&self.logger
@@ -92,4 +118,10 @@ impl AppContextTrait for AppContext {
fn environment(&self) -> &Environment {
&self.environment
}
fn crypto(&self) -> &CryptoService {
&self.crypto
}
fn task(&self) -> &TaskService {
self.task.get().expect("task should be set")
}
}

View File

@@ -0,0 +1,4 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CryptoConfig {}

View File

@@ -0,0 +1,11 @@
#[derive(Debug, snafu::Snafu)]
pub enum CryptoError {
#[snafu(transparent)]
Base64DecodeError { source: base64::DecodeError },
#[snafu(display("CocoonError: {source:?}"), context(false))]
CocoonError { source: cocoon::Error },
#[snafu(transparent)]
FromUtf8Error { source: std::string::FromUtf8Error },
#[snafu(transparent)]
SerdeJsonError { source: serde_json::Error },
}

View File

@@ -0,0 +1,9 @@
pub mod config;
pub mod error;
pub mod service;
pub mod userpass;
pub use config::CryptoConfig;
pub use error::CryptoError;
pub use service::CryptoService;
pub use userpass::UserPassCredential;

View File

@@ -0,0 +1,65 @@
use base64::prelude::{BASE64_URL_SAFE, *};
use cocoon::Cocoon;
use rand::Rng;
use serde::{Deserialize, Serialize};
use super::CryptoConfig;
use crate::crypto::error::CryptoError;
pub struct CryptoService {
#[allow(dead_code)]
config: CryptoConfig,
}
impl CryptoService {
pub async fn from_config(config: CryptoConfig) -> Result<Self, CryptoError> {
Ok(Self { config })
}
pub fn encrypt_data(&self, data: String) -> Result<String, CryptoError> {
let key = rand::rng().random::<[u8; 32]>();
let mut cocoon = Cocoon::new(&key);
let mut data = data.into_bytes();
let detached_prefix = cocoon.encrypt(&mut data)?;
let mut combined = Vec::with_capacity(key.len() + detached_prefix.len() + data.len());
combined.extend_from_slice(&key);
combined.extend_from_slice(&detached_prefix);
combined.extend_from_slice(&data);
Ok(BASE64_URL_SAFE.encode(combined))
}
pub fn decrypt_data(&self, data: &str) -> Result<String, CryptoError> {
let decoded = BASE64_URL_SAFE.decode(data)?;
let (key, remain) = decoded.split_at(32);
let (detached_prefix, data) = remain.split_at(60);
let mut data = data.to_vec();
let cocoon = Cocoon::new(key);
cocoon.decrypt(&mut data, detached_prefix)?;
String::from_utf8(data).map_err(CryptoError::from)
}
pub fn encrypt_credentials<T: Serialize>(
&self,
credentials: &T,
) -> Result<String, CryptoError> {
let json = serde_json::to_string(credentials)?;
self.encrypt_data(json)
}
pub fn decrypt_credentials<T: for<'de> Deserialize<'de>>(
&self,
encrypted: &str,
) -> Result<T, CryptoError> {
let data = self.decrypt_data(encrypted)?;
serde_json::from_str(&data).map_err(CryptoError::from)
}
}

View File

@@ -0,0 +1,19 @@
use std::fmt::Debug;
pub struct UserPassCredential {
pub username: String,
pub password: String,
pub user_agent: Option<String>,
pub cookies: Option<String>,
}
impl Debug for UserPassCredential {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("UserPassCredential")
.field("username", &"[Secret]")
.field("password", &"[Secret]")
.field("cookies", &"[Secret]")
.field("user_agent", &self.user_agent)
.finish()
}
}

View File

@@ -1,8 +1,8 @@
use std::{ops::Deref, time::Duration};
use sea_orm::{
ConnectOptions, ConnectionTrait, Database, DatabaseBackend, DatabaseConnection, DbBackend,
DbErr, ExecResult, QueryResult, Statement,
ConnectOptions, ConnectionTrait, Database, DatabaseConnection, DbBackend, DbErr, ExecResult,
QueryResult, Statement,
};
use sea_orm_migration::MigratorTrait;
@@ -28,20 +28,21 @@ impl DatabaseService {
let db = Database::connect(opt).await?;
if db.get_database_backend() == DatabaseBackend::Sqlite {
db.execute(Statement::from_string(
DatabaseBackend::Sqlite,
"
PRAGMA foreign_keys = ON;
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
PRAGMA mmap_size = 134217728;
PRAGMA journal_size_limit = 67108864;
PRAGMA cache_size = 2000;
",
))
.await?;
}
// only support postgres for now
// if db.get_database_backend() == DatabaseBackend::Sqlite {
// db.execute(Statement::from_string(
// DatabaseBackend::Sqlite,
// "
// PRAGMA foreign_keys = ON;
// PRAGMA journal_mode = WAL;
// PRAGMA synchronous = NORMAL;
// PRAGMA mmap_size = 134217728;
// PRAGMA journal_size_limit = 67108864;
// PRAGMA cache_size = 2000;
// ",
// ))
// .await?;
// }
if config.auto_migrate {
Migrator::up(&db, None).await?;

View File

@@ -4,13 +4,14 @@ use axum::{
Json,
response::{IntoResponse, Response},
};
use fetch::{FetchError, HttpClientError};
use fetch::{FetchError, HttpClientError, reqwest, reqwest_middleware};
use http::StatusCode;
use serde::{Deserialize, Deserializer, Serialize};
use snafu::Snafu;
use crate::{
auth::AuthError,
crypto::CryptoError,
downloader::DownloaderError,
errors::{OptDynErr, response::StandardErrorResponse},
};
@@ -102,6 +103,14 @@ pub enum RecorderError {
ModelEntityNotFound { entity: Cow<'static, str> },
#[snafu(transparent)]
FetchError { source: FetchError },
#[snafu(display("Credential3rdError: {source}"))]
Credential3rdError {
message: String,
#[snafu(source(from(Box<dyn std::error::Error + Send + Sync>, OptDynErr::some)))]
source: OptDynErr,
},
#[snafu(transparent)]
CryptoError { source: CryptoError },
#[snafu(display("{message}"))]
Whatever {
message: String,
@@ -195,4 +204,16 @@ impl<'de> Deserialize<'de> for RecorderError {
}
}
impl From<reqwest::Error> for RecorderError {
fn from(error: reqwest::Error) -> Self {
FetchError::from(error).into()
}
}
impl From<reqwest_middleware::Error> for RecorderError {
fn from(error: reqwest_middleware::Error) -> Self {
FetchError::from(error).into()
}
}
pub type RecorderResult<T> = Result<T, RecorderError>;

View File

@@ -2,7 +2,10 @@ use url::Url;
pub fn extract_image_src_from_str(image_src: &str, base_url: &Url) -> Option<Url> {
let mut image_url = base_url.join(image_src).ok()?;
image_url.set_query(None);
image_url.set_fragment(None);
if let Some((_, value)) = image_url.query_pairs().find(|(key, _)| key == "webp") {
image_url.set_query(Some(&format!("webp={}", value)));
} else {
image_url.set_query(None);
}
Some(image_url)
}

View File

@@ -1,60 +1,204 @@
use std::{fmt::Debug, ops::Deref};
use std::{fmt::Debug, ops::Deref, sync::Arc};
use fetch::{HttpClient, HttpClientTrait, client::HttpClientCookiesAuth};
use fetch::{HttpClient, HttpClientTrait};
use maplit::hashmap;
use sea_orm::DbErr;
use secrecy::SecretBox;
use serde::{Deserialize, Serialize};
use url::Url;
use util::OptDynErr;
use super::MikanConfig;
use crate::errors::RecorderError;
use super::{MikanConfig, constants::MIKAN_ACCOUNT_MANAGE_PAGE_PATH};
use crate::{
app::AppContextTrait,
crypto::UserPassCredential,
errors::{RecorderError, RecorderResult},
extract::mikan::constants::{MIKAN_LOGIN_PAGE_PATH, MIKAN_LOGIN_PAGE_SEARCH},
models::credential_3rd::{self, Credential3rdType},
};
#[derive(Default, Clone, Deserialize, Serialize)]
pub struct MikanAuthSecrecy {
pub cookie: String,
pub user_agent: Option<String>,
pub struct MikanCredentialForm {
pub password: String,
pub username: String,
pub user_agent: String,
}
impl Debug for MikanAuthSecrecy {
pub type MikanAuthSecrecy = SecretBox<MikanCredentialForm>;
impl Debug for MikanCredentialForm {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MikanAuthSecrecy")
.field("cookie", &String::from("[secrecy]"))
f.debug_struct("MikanCredentialForm")
.field("username", &String::from("[secrecy]"))
.field("password", &String::from("[secrecy]"))
.field("user_agent", &String::from("[secrecy]"))
.finish()
}
}
impl MikanAuthSecrecy {
pub fn into_cookie_auth(self, url: &Url) -> Result<HttpClientCookiesAuth, RecorderError> {
HttpClientCookiesAuth::from_cookies(&self.cookie, url, self.user_agent)
.map_err(RecorderError::from)
}
}
#[derive(Debug)]
pub struct MikanClient {
http_client: HttpClient,
base_url: Url,
origin_url: Url,
userpass_credential: Option<UserPassCredential>,
}
impl MikanClient {
pub async fn from_config(config: MikanConfig) -> Result<Self, RecorderError> {
let http_client = HttpClient::from_config(config.http_client)?;
let base_url = config.base_url;
let origin_url = Url::parse(&base_url.origin().unicode_serialization())?;
Ok(Self {
http_client,
base_url,
origin_url,
userpass_credential: None,
})
}
pub fn fork_with_auth(&self, secrecy: Option<MikanAuthSecrecy>) -> Result<Self, RecorderError> {
let mut fork = self.http_client.fork();
pub async fn has_login(&self) -> RecorderResult<bool> {
let account_manage_page_url = self.base_url.join(MIKAN_ACCOUNT_MANAGE_PAGE_PATH)?;
let res = self.http_client.get(account_manage_page_url).send().await?;
let status = res.status();
if status.is_success() {
Ok(true)
} else if status.is_redirection()
&& res.headers().get("location").is_some_and(|location| {
location
.to_str()
.is_ok_and(|location_str| location_str.contains(MIKAN_LOGIN_PAGE_PATH))
})
{
Ok(false)
} else {
Err(RecorderError::Credential3rdError {
message: format!("mikan account check has login failed, status = {}", status),
source: None.into(),
})
}
}
if let Some(secrecy) = secrecy {
let cookie_auth = secrecy.into_cookie_auth(&self.base_url)?;
fork = fork.attach_secrecy(cookie_auth);
pub async fn login(&self) -> RecorderResult<()> {
let userpass_credential =
self.userpass_credential
.as_ref()
.ok_or_else(|| RecorderError::Credential3rdError {
message: "mikan login failed, credential required".to_string(),
source: None.into(),
})?;
let login_page_url = {
let mut u = self.base_url.join(MIKAN_LOGIN_PAGE_PATH)?;
u.set_query(Some(MIKAN_LOGIN_PAGE_SEARCH));
u
};
// access login page to get antiforgery cookie
self.http_client
.get(login_page_url.clone())
.send()
.await
.map_err(|error| RecorderError::Credential3rdError {
message: "failed to get mikan login page".to_string(),
source: OptDynErr::some_boxed(error),
})?;
let antiforgery_cookie = {
let cookie_store_lock = self.http_client.cookie_store.clone().ok_or_else(|| {
RecorderError::Credential3rdError {
message: "failed to get cookie store".to_string(),
source: None.into(),
}
})?;
let cookie_store =
cookie_store_lock
.read()
.map_err(|_| RecorderError::Credential3rdError {
message: "failed to read cookie store".to_string(),
source: None.into(),
})?;
cookie_store
.matches(&login_page_url)
.iter()
.find(|cookie| cookie.name().starts_with(".AspNetCore.Antiforgery."))
.map(|cookie| cookie.value().to_string())
}
.ok_or_else(|| RecorderError::Credential3rdError {
message: "mikan login failed, failed to get antiforgery cookie".to_string(),
source: None.into(),
})?;
let login_post_form = hashmap! {
"__RequestVerificationToken".to_string() => antiforgery_cookie,
"UserName".to_string() => userpass_credential.username.clone(),
"Password".to_string() => userpass_credential.password.clone(),
"RememberMe".to_string() => "true".to_string(),
};
let login_post_res = self
.http_client
.post(login_page_url.clone())
.form(&login_post_form)
.send()
.await
.map_err(|err| RecorderError::Credential3rdError {
message: "mikan login failed".to_string(),
source: OptDynErr::some_boxed(err),
})?;
if login_post_res.status().is_redirection()
&& login_post_res.headers().contains_key("location")
{
Ok(())
} else {
Err(RecorderError::Credential3rdError {
message: "mikan login failed, no redirecting".to_string(),
source: None.into(),
})
}
}
pub async fn fork_with_credential(
&self,
ctx: Arc<dyn AppContextTrait>,
credential_id: Option<i32>,
) -> RecorderResult<Self> {
let mut fork = self.http_client.fork();
let mut userpass_credential_opt = None;
if let Some(credential_id) = credential_id {
let credential = credential_3rd::Model::find_by_id(ctx.clone(), credential_id).await?;
if let Some(credential) = credential {
if credential.credential_type != Credential3rdType::Mikan {
return Err(RecorderError::Credential3rdError {
message: "credential is not a mikan credential".to_string(),
source: None.into(),
});
}
let userpass_credential: UserPassCredential =
credential.try_into_userpass_credential(ctx)?;
if let Some(cookies) = userpass_credential.cookies.as_ref() {
fork = fork.attach_cookies(cookies)?;
}
if let Some(user_agent) = userpass_credential.user_agent.as_ref() {
fork = fork.attach_user_agent(user_agent);
}
userpass_credential_opt = Some(userpass_credential);
} else {
return Err(RecorderError::from_db_record_not_found(
DbErr::RecordNotFound(format!("credential={} not found", credential_id)),
));
}
}
Ok(Self {
http_client: HttpClient::from_fork(fork)?,
base_url: self.base_url.clone(),
origin_url: self.origin_url.clone(),
userpass_credential: userpass_credential_opt,
})
}

View File

@@ -1,3 +1,6 @@
pub const MIKAN_BUCKET_KEY: &str = "mikan";
pub const MIKAN_UNKNOWN_FANSUB_NAME: &str = "生肉/不明字幕";
pub const MIKAN_UNKNOWN_FANSUB_ID: &str = "202";
pub const MIKAN_LOGIN_PAGE_PATH: &str = "/Account/Login";
pub const MIKAN_LOGIN_PAGE_SEARCH: &str = "?ReturnUrl=%2F";
pub const MIKAN_ACCOUNT_MANAGE_PAGE_PATH: &str = "/Account/Manage";

View File

@@ -4,18 +4,20 @@ pub mod constants;
pub mod rss_extract;
pub mod web_extract;
pub use client::{MikanAuthSecrecy, MikanClient};
pub use client::{MikanClient, MikanCredentialForm};
pub use config::MikanConfig;
pub use constants::MIKAN_BUCKET_KEY;
pub use rss_extract::{
MikanBangumiAggregationRssChannel, MikanBangumiRssChannel, MikanBangumiRssLink,
MikanBangumiAggregationRssChannel, MikanBangumiRssChannel, MikanBangumiRssUrlMeta,
MikanRssChannel, MikanRssItem, MikanSubscriberAggregationRssChannel,
MikanSubscriberAggregationRssLink, build_mikan_bangumi_rss_link,
build_mikan_subscriber_aggregation_rss_link, extract_mikan_bangumi_id_from_rss_link,
MikanSubscriberAggregationRssUrlMeta, build_mikan_bangumi_rss_url,
build_mikan_subscriber_aggregation_rss_url, extract_mikan_bangumi_id_from_rss_url,
extract_mikan_rss_channel_from_rss_link, extract_mikan_subscriber_aggregation_id_from_rss_link,
};
pub use web_extract::{
MikanBangumiMeta, MikanEpisodeMeta, build_mikan_bangumi_homepage, build_mikan_episode_homepage,
MikanBangumiMeta, MikanEpisodeMeta, MikanSeasonStr, build_mikan_bangumi_homepage_url,
build_mikan_episode_homepage_url, build_mikan_season_flow_url,
extract_mikan_bangumi_indices_meta_from_season_flow_fragment,
extract_mikan_bangumi_meta_from_bangumi_homepage,
extract_mikan_episode_meta_from_episode_homepage,
};

View File

@@ -12,7 +12,7 @@ use crate::{
errors::app_error::{RecorderError, RecorderResult},
extract::mikan::{
MikanClient,
web_extract::{MikanEpisodeHomepage, extract_mikan_episode_id_from_homepage},
web_extract::{MikanEpisodeHomepage, extract_mikan_episode_id_from_homepage_url},
},
};
@@ -135,7 +135,7 @@ impl TryFrom<rss::Item> for MikanRssItem {
let MikanEpisodeHomepage {
mikan_episode_id, ..
} = extract_mikan_episode_id_from_homepage(&homepage).ok_or_else(|| {
} = extract_mikan_episode_id_from_homepage_url(&homepage).ok_or_else(|| {
RecorderError::from_mikan_rss_invalid_field(Cow::Borrowed("mikan_episode_id"))
})?;
@@ -155,17 +155,17 @@ impl TryFrom<rss::Item> for MikanRssItem {
}
#[derive(Debug, Clone)]
pub struct MikanBangumiRssLink {
pub struct MikanBangumiRssUrlMeta {
pub mikan_bangumi_id: String,
pub mikan_fansub_id: Option<String>,
}
#[derive(Debug, Clone)]
pub struct MikanSubscriberAggregationRssLink {
pub struct MikanSubscriberAggregationRssUrlMeta {
pub mikan_aggregation_id: String,
}
pub fn build_mikan_bangumi_rss_link(
pub fn build_mikan_bangumi_rss_url(
mikan_base_url: impl IntoUrl,
mikan_bangumi_id: &str,
mikan_fansub_id: Option<&str>,
@@ -181,7 +181,7 @@ pub fn build_mikan_bangumi_rss_link(
Ok(url)
}
pub fn build_mikan_subscriber_aggregation_rss_link(
pub fn build_mikan_subscriber_aggregation_rss_url(
mikan_base_url: &str,
mikan_aggregation_id: &str,
) -> RecorderResult<Url> {
@@ -192,11 +192,11 @@ pub fn build_mikan_subscriber_aggregation_rss_link(
Ok(url)
}
pub fn extract_mikan_bangumi_id_from_rss_link(url: &Url) -> Option<MikanBangumiRssLink> {
pub fn extract_mikan_bangumi_id_from_rss_url(url: &Url) -> Option<MikanBangumiRssUrlMeta> {
if url.path() == "/RSS/Bangumi" {
url.query_pairs()
.find(|(k, _)| k == "bangumiId")
.map(|(_, v)| MikanBangumiRssLink {
.map(|(_, v)| MikanBangumiRssUrlMeta {
mikan_bangumi_id: v.to_string(),
mikan_fansub_id: url
.query_pairs()
@@ -210,10 +210,10 @@ pub fn extract_mikan_bangumi_id_from_rss_link(url: &Url) -> Option<MikanBangumiR
pub fn extract_mikan_subscriber_aggregation_id_from_rss_link(
url: &Url,
) -> Option<MikanSubscriberAggregationRssLink> {
) -> Option<MikanSubscriberAggregationRssUrlMeta> {
if url.path() == "/RSS/MyBangumi" {
url.query_pairs().find(|(k, _)| k == "token").map(|(_, v)| {
MikanSubscriberAggregationRssLink {
MikanSubscriberAggregationRssUrlMeta {
mikan_aggregation_id: v.to_string(),
}
})
@@ -233,10 +233,10 @@ pub async fn extract_mikan_rss_channel_from_rss_link(
let channel_link = Url::parse(channel.link())?;
if let Some(MikanBangumiRssLink {
if let Some(MikanBangumiRssUrlMeta {
mikan_bangumi_id,
mikan_fansub_id,
}) = extract_mikan_bangumi_id_from_rss_link(&channel_link)
}) = extract_mikan_bangumi_id_from_rss_url(&channel_link)
{
tracing::trace!(
mikan_bangumi_id,
@@ -290,7 +290,7 @@ pub async fn extract_mikan_rss_channel_from_rss_link(
},
))
}
} else if let Some(MikanSubscriberAggregationRssLink {
} else if let Some(MikanSubscriberAggregationRssUrlMeta {
mikan_aggregation_id,
..
}) = extract_mikan_subscriber_aggregation_id_from_rss_link(&channel_link)

View File

@@ -1,22 +1,19 @@
use std::{borrow::Cow, sync::Arc};
use std::{borrow::Cow, fmt};
use async_stream::try_stream;
use bytes::Bytes;
use fetch::{html::fetch_html, image::fetch_image};
use futures::Stream;
use itertools::Itertools;
use html_escape::decode_html_entities;
use scraper::{Html, Selector};
use serde::{Deserialize, Serialize};
use tracing::instrument;
use url::Url;
use super::{
MIKAN_BUCKET_KEY, MikanAuthSecrecy, MikanBangumiRssLink, MikanClient,
extract_mikan_bangumi_id_from_rss_link,
MIKAN_BUCKET_KEY, MikanBangumiRssUrlMeta, MikanClient, extract_mikan_bangumi_id_from_rss_url,
};
use crate::{
app::AppContextTrait,
errors::app_error::{RecorderResult, RecorderError},
errors::app_error::{RecorderError, RecorderResult},
extract::{
html::{extract_background_image_src_from_style_attr, extract_inner_text_from_element_ref},
media::extract_image_src_from_str,
@@ -24,6 +21,29 @@ use crate::{
storage::StorageContentCategory,
};
#[derive(Clone, Debug, Copy, Serialize, Deserialize)]
pub enum MikanSeasonStr {
#[serde(rename = "")]
Spring,
#[serde(rename = "")]
Summer,
#[serde(rename = "")]
Autumn,
#[serde(rename = "")]
Winter,
}
impl fmt::Display for MikanSeasonStr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Spring => write!(f, ""),
Self::Summer => write!(f, ""),
Self::Autumn => write!(f, ""),
Self::Winter => write!(f, ""),
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct MikanEpisodeMeta {
pub homepage: Url,
@@ -36,6 +56,14 @@ pub struct MikanEpisodeMeta {
pub mikan_episode_id: String,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct MikanBangumiIndexMeta {
pub homepage: Url,
pub origin_poster_src: Option<Url>,
pub bangumi_title: String,
pub mikan_bangumi_id: String,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct MikanBangumiMeta {
pub homepage: Url,
@@ -53,6 +81,19 @@ pub struct MikanBangumiPosterMeta {
pub poster_src: Option<String>,
}
impl From<MikanBangumiIndexMeta> for MikanBangumiMeta {
fn from(index_meta: MikanBangumiIndexMeta) -> Self {
MikanBangumiMeta {
homepage: index_meta.homepage,
origin_poster_src: index_meta.origin_poster_src,
bangumi_title: index_meta.bangumi_title,
mikan_bangumi_id: index_meta.mikan_bangumi_id,
mikan_fansub_id: None,
fansub: None,
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct MikanEpisodeHomepage {
pub mikan_episode_id: String,
@@ -64,7 +105,7 @@ pub struct MikanBangumiHomepage {
pub mikan_fansub_id: Option<String>,
}
pub fn build_mikan_bangumi_homepage(
pub fn build_mikan_bangumi_homepage_url(
mikan_base_url: Url,
mikan_bangumi_id: &str,
mikan_fansub_id: Option<&str>,
@@ -75,13 +116,29 @@ pub fn build_mikan_bangumi_homepage(
url
}
pub fn build_mikan_episode_homepage(mikan_base_url: Url, mikan_episode_id: &str) -> Url {
pub fn build_mikan_season_flow_url(
mikan_base_url: Url,
year: i32,
season_str: MikanSeasonStr,
) -> Url {
let mut url = mikan_base_url;
url.set_path("/Home/BangumiCoverFlow");
url.query_pairs_mut()
.append_pair("year", &year.to_string())
.append_pair("seasonStr", &season_str.to_string());
url
}
pub fn build_mikan_episode_homepage_url(mikan_base_url: Url, mikan_episode_id: &str) -> Url {
let mut url = mikan_base_url;
url.set_path(&format!("/Home/Episode/{mikan_episode_id}"));
url
}
pub fn build_mikan_bangumi_expand_info_url(mikan_base_url: Url, mikan_bangumi_id: &str) -> Url {
pub fn build_mikan_bangumi_expand_subscribed_fragment_url(
mikan_base_url: Url,
mikan_bangumi_id: &str,
) -> Url {
let mut url = mikan_base_url;
url.set_path("/ExpandBangumi");
url.query_pairs_mut()
@@ -90,7 +147,7 @@ pub fn build_mikan_bangumi_expand_info_url(mikan_base_url: Url, mikan_bangumi_id
url
}
pub fn extract_mikan_bangumi_id_from_homepage(url: &Url) -> Option<MikanBangumiHomepage> {
pub fn extract_mikan_bangumi_id_from_homepage_url(url: &Url) -> Option<MikanBangumiHomepage> {
if url.path().starts_with("/Home/Bangumi/") {
let mikan_bangumi_id = url.path().replace("/Home/Bangumi/", "");
@@ -103,7 +160,7 @@ pub fn extract_mikan_bangumi_id_from_homepage(url: &Url) -> Option<MikanBangumiH
}
}
pub fn extract_mikan_episode_id_from_homepage(url: &Url) -> Option<MikanEpisodeHomepage> {
pub fn extract_mikan_episode_id_from_homepage_url(url: &Url) -> Option<MikanEpisodeHomepage> {
if url.path().starts_with("/Home/Episode/") {
let mikan_episode_id = url.path().replace("/Home/Episode/", "");
Some(MikanEpisodeHomepage { mikan_episode_id })
@@ -191,7 +248,7 @@ pub async fn extract_mikan_episode_meta_from_episode_homepage(
tracing::warn!(error = %error);
})?;
let MikanBangumiRssLink {
let MikanBangumiRssUrlMeta {
mikan_bangumi_id,
mikan_fansub_id,
..
@@ -200,7 +257,7 @@ pub async fn extract_mikan_episode_meta_from_episode_homepage(
.next()
.and_then(|el| el.value().attr("href"))
.and_then(|s| mikan_episode_homepage_url.join(s).ok())
.and_then(|rss_link_url| extract_mikan_bangumi_id_from_rss_link(&rss_link_url))
.and_then(|rss_link_url| extract_mikan_bangumi_id_from_rss_url(&rss_link_url))
.ok_or_else(|| {
RecorderError::from_mikan_meta_missing_field(Cow::Borrowed("mikan_bangumi_id"))
})
@@ -223,7 +280,7 @@ pub async fn extract_mikan_episode_meta_from_episode_homepage(
let MikanEpisodeHomepage {
mikan_episode_id, ..
} = extract_mikan_episode_id_from_homepage(&mikan_episode_homepage_url)
} = extract_mikan_episode_id_from_homepage_url(&mikan_episode_homepage_url)
.ok_or_else(|| {
RecorderError::from_mikan_meta_missing_field(Cow::Borrowed("mikan_episode_id"))
})
@@ -303,9 +360,9 @@ pub async fn extract_mikan_bangumi_meta_from_bangumi_homepage(
.next()
.and_then(|el| el.value().attr("href"))
.and_then(|s| mikan_bangumi_homepage_url.join(s).ok())
.and_then(|rss_link_url| extract_mikan_bangumi_id_from_rss_link(&rss_link_url))
.and_then(|rss_link_url| extract_mikan_bangumi_id_from_rss_url(&rss_link_url))
.map(
|MikanBangumiRssLink {
|MikanBangumiRssUrlMeta {
mikan_bangumi_id, ..
}| mikan_bangumi_id,
)
@@ -325,7 +382,7 @@ pub async fn extract_mikan_bangumi_meta_from_bangumi_homepage(
})
});
let (mikan_fansub_id, fansub_name) = mikan_bangumi_homepage_url
let (mikan_fansub_id, fansub) = mikan_bangumi_homepage_url
.fragment()
.and_then(|id| {
html.select(
@@ -341,7 +398,7 @@ pub async fn extract_mikan_bangumi_meta_from_bangumi_homepage(
bangumi_title,
mikan_bangumi_id,
origin_poster_src = origin_poster_src.as_ref().map(|url| url.as_str()),
fansub_name,
fansub,
mikan_fansub_id,
"mikan bangumi meta extracted"
);
@@ -351,154 +408,141 @@ pub async fn extract_mikan_bangumi_meta_from_bangumi_homepage(
bangumi_title,
origin_poster_src,
mikan_bangumi_id,
fansub: fansub_name,
fansub,
mikan_fansub_id,
})
}
#[instrument(skip_all, fields(my_bangumi_page_url, auth_secrecy = ?auth_secrecy, history = history.len()))]
pub fn extract_mikan_bangumis_meta_from_my_bangumi_page(
context: Arc<dyn AppContextTrait>,
my_bangumi_page_url: Url,
auth_secrecy: Option<MikanAuthSecrecy>,
history: &[Arc<RecorderResult<MikanBangumiMeta>>],
) -> impl Stream<Item = RecorderResult<MikanBangumiMeta>> {
try_stream! {
let http_client = &context.mikan().fork_with_auth(auth_secrecy.clone())?;
#[instrument]
pub fn extract_mikan_bangumi_indices_meta_from_season_flow_fragment(
season_flow_fragment: &str,
mikan_base_url: Url,
) -> Vec<MikanBangumiIndexMeta> {
let html = Html::parse_fragment(season_flow_fragment);
let mikan_base_url = Url::parse(&my_bangumi_page_url.origin().unicode_serialization())?;
let bangumi_empty_selector = &Selector::parse(".no-subscribe-bangumi").unwrap();
let content = fetch_html(http_client, my_bangumi_page_url.clone()).await?;
if html.select(bangumi_empty_selector).next().is_some() {
return vec![];
}
let fansub_container_selector =
&Selector::parse(".js-expand_bangumi-subgroup.js-subscribed").unwrap();
let fansub_title_selector = &Selector::parse(".tag-res-name[title]").unwrap();
let fansub_id_selector =
&Selector::parse(".active[data-subtitlegroupid][data-bangumiid]").unwrap();
let bangumi_item_selector = &Selector::parse(".mine.an-box ul.an-ul>li").unwrap();
let bangumi_poster_span_selector = &Selector::parse("span[data-src][data-bangumiid]").unwrap();
let bangumi_title_a_selector = &Selector::parse(".an-info-group a.an-text[title]").unwrap();
let bangumi_items = {
let html = Html::parse_document(&content);
let mut items = vec![];
for bangumi_item in html.select(bangumi_item_selector) {
let bangumi_poster_span = bangumi_item.select(bangumi_poster_span_selector).next();
let bangumi_title_a = bangumi_item.select(bangumi_title_a_selector).next();
if let (Some(bangumi_poster_span), Some(bangumi_title_a)) =
(bangumi_poster_span, bangumi_title_a)
{
let origin_poster_src = bangumi_poster_span
.attr("data-src")
.and_then(|data_src| extract_image_src_from_str(data_src, &mikan_base_url));
let bangumi_title = bangumi_title_a
.attr("title")
.map(|title| decode_html_entities(&title).trim().to_string());
let mikan_bangumi_id = bangumi_poster_span
.attr("data-bangumiid")
.map(|id| id.to_string());
let bangumi_container_selector = &Selector::parse(".sk-bangumi .an-ul>li").unwrap();
let bangumi_info_selector = &Selector::parse(".an-info a.an-text").unwrap();
let bangumi_poster_selector =
&Selector::parse("span[data-src][data-bangumiid], span[data-bangumiid][style]")
.unwrap();
html.select(bangumi_container_selector)
.filter_map(|bangumi_elem| {
let title_and_href_elem =
bangumi_elem.select(bangumi_info_selector).next();
let poster_elem = bangumi_elem.select(bangumi_poster_selector).next();
if let (Some(bangumi_home_page_url), Some(bangumi_title)) = (
title_and_href_elem.and_then(|elem| elem.attr("href")),
title_and_href_elem.and_then(|elem| elem.attr("title")),
) {
let origin_poster_src = poster_elem.and_then(|ele| {
ele.attr("data-src")
.and_then(|data_src| {
extract_image_src_from_str(data_src, &mikan_base_url)
})
.or_else(|| {
ele.attr("style").and_then(|style| {
extract_background_image_src_from_style_attr(
style,
&mikan_base_url,
)
})
})
});
let bangumi_title = bangumi_title.to_string();
let bangumi_home_page_url =
my_bangumi_page_url.join(bangumi_home_page_url).ok()?;
let MikanBangumiHomepage {
mikan_bangumi_id, ..
} = extract_mikan_bangumi_id_from_homepage(&bangumi_home_page_url)?;
if let Some(origin_poster_src) = origin_poster_src.as_ref() {
tracing::trace!(
origin_poster_src = origin_poster_src.as_str(),
bangumi_title,
mikan_bangumi_id,
"bangumi info extracted"
);
} else {
tracing::warn!(
bangumi_title,
mikan_bangumi_id,
"bangumi info extracted, but failed to extract poster_src"
);
}
let bangumi_expand_info_url = build_mikan_bangumi_expand_info_url(
mikan_base_url.clone(),
&mikan_bangumi_id,
);
Some((
if let (Some(bangumi_title), Some(mikan_bangumi_id)) = (bangumi_title, mikan_bangumi_id)
{
let homepage = build_mikan_bangumi_homepage_url(
mikan_base_url.clone(),
&mikan_bangumi_id,
None,
);
if let Some(origin_poster_src) = origin_poster_src.as_ref() {
tracing::trace!(
origin_poster_src = origin_poster_src.as_str(),
bangumi_title,
mikan_bangumi_id,
bangumi_expand_info_url,
origin_poster_src,
))
"bangumi index meta extracted"
);
} else {
tracing::warn!(
bangumi_title,
mikan_bangumi_id,
"bangumi index meta extracted, but failed to extract poster_src"
);
}
items.push(MikanBangumiIndexMeta {
homepage,
origin_poster_src,
bangumi_title,
mikan_bangumi_id,
})
}
}
}
items
}
#[instrument(skip_all, fields(mikan_bangumi_index = mikan_bangumi_index.mikan_bangumi_id.as_str()))]
pub fn extract_mikan_bangumi_meta_from_expand_subscribed_fragment(
mikan_bangumi_index: MikanBangumiIndexMeta,
expand_subscribed_fragment: &str,
mikan_base_url: Url,
) -> Option<MikanBangumiMeta> {
let html = Html::parse_fragment(expand_subscribed_fragment);
let fansub_container_selector =
&Selector::parse(".js-expand_bangumi-subgroup.js-subscribed").unwrap();
let fansub_title_selector = &Selector::parse(".tag-res-name[title]").unwrap();
let fansub_id_selector =
&Selector::parse(".active[data-subtitlegroupid][data-bangumiid]").unwrap();
if let Some((fansub_name, mikan_fansub_id)) = {
html.select(fansub_container_selector)
.next()
.and_then(|fansub_info| {
if let (Some(fansub_name), Some(mikan_fansub_id)) = (
fansub_info
.select(fansub_title_selector)
.next()
.and_then(|ele| ele.attr("title"))
.map(String::from),
fansub_info
.select(fansub_id_selector)
.next()
.and_then(|ele| ele.attr("data-subtitlegroupid"))
.map(String::from),
) {
Some((fansub_name, mikan_fansub_id))
} else {
None
}
})
.collect_vec()
};
} {
tracing::trace!(fansub_name, mikan_fansub_id, "subscribed fansub extracted");
let mikan_bangumi_id = mikan_bangumi_index.mikan_bangumi_id;
let bangumi_title = mikan_bangumi_index.bangumi_title;
let origin_poster_src = mikan_bangumi_index.origin_poster_src;
for (idx, (bangumi_title, mikan_bangumi_id, bangumi_expand_info_url, origin_poster_src)) in
bangumi_items.iter().enumerate()
{
if history.get(idx).is_some() {
continue;
} else if let Some((fansub_name, mikan_fansub_id)) = {
let bangumi_expand_info_content =
fetch_html(http_client, bangumi_expand_info_url.clone()).await?;
let bangumi_expand_info_fragment =
Html::parse_fragment(&bangumi_expand_info_content);
bangumi_expand_info_fragment
.select(fansub_container_selector)
.next()
.and_then(|fansub_info| {
if let (Some(fansub_name), Some(mikan_fansub_id)) = (
fansub_info
.select(fansub_title_selector)
.next()
.and_then(|ele| ele.attr("title"))
.map(String::from),
fansub_info
.select(fansub_id_selector)
.next()
.and_then(|ele| ele.attr("data-subtitlegroupid"))
.map(String::from),
) {
Some((fansub_name, mikan_fansub_id))
} else {
None
}
})
} {
tracing::trace!(fansub_name, mikan_fansub_id, "subscribed fansub extracted");
let item = MikanBangumiMeta {
homepage: build_mikan_bangumi_homepage(
mikan_base_url.clone(),
mikan_bangumi_id,
Some(&mikan_fansub_id),
),
bangumi_title: bangumi_title.to_string(),
mikan_bangumi_id: mikan_bangumi_id.to_string(),
mikan_fansub_id: Some(mikan_fansub_id),
fansub: Some(fansub_name),
origin_poster_src: origin_poster_src.clone(),
};
yield item;
}
}
Some(MikanBangumiMeta {
homepage: build_mikan_bangumi_homepage_url(
mikan_base_url.clone(),
&mikan_bangumi_id,
Some(&mikan_fansub_id),
),
bangumi_title: bangumi_title.to_string(),
mikan_bangumi_id: mikan_bangumi_id.to_string(),
mikan_fansub_id: Some(mikan_fansub_id),
fansub: Some(fansub_name),
origin_poster_src: origin_poster_src.clone(),
})
} else {
tracing::trace!("subscribed fansub not found");
None
}
}
#[cfg(test)]
mod test {
#![allow(unused_variables)]
use std::{fs, sync::Arc};
use futures::{TryStreamExt, pin_mut};
use http::header;
use rstest::{fixture, rstest};
@@ -507,9 +551,12 @@ mod test {
use zune_image::{codecs::ImageFormat, image::Image};
use super::*;
use crate::test_utils::{
app::UnitTestAppContext, mikan::build_testing_mikan_client,
tracing::try_init_testing_tracing,
use crate::{
extract::mikan::MikanCredentialForm,
test_utils::{
app::UnitTestAppContext, mikan::build_testing_mikan_client,
tracing::try_init_testing_tracing,
},
};
#[fixture]
@@ -590,7 +637,9 @@ mod test {
#[rstest]
#[tokio::test]
async fn test_extract_mikan_bangumi_meta_from_bangumi_homepage(before_each: ()) -> RecorderResult<()> {
async fn test_extract_mikan_bangumi_meta_from_bangumi_homepage(
before_each: (),
) -> RecorderResult<()> {
let mut mikan_server = mockito::Server::new_async().await;
let mikan_base_url = Url::parse(&mikan_server.url())?;
let mikan_client = build_testing_mikan_client(mikan_base_url.clone()).await?;
@@ -626,95 +675,217 @@ mod test {
}
#[rstest]
#[tokio::test]
async fn test_extract_mikan_bangumis_meta_from_my_bangumi_page(before_each: ()) -> RecorderResult<()> {
let mut mikan_server = mockito::Server::new_async().await;
#[test]
fn test_extract_mikan_bangumi_indices_meta_from_season_flow_fragment(
before_each: (),
) -> RecorderResult<()> {
let fragment =
fs::read_to_string("tests/resources/mikan/BangumiCoverFlow-2025-spring.html")?;
let mikan_base_url = Url::parse(&mikan_server.url())?;
let my_bangumi_page_url = mikan_base_url.join("/Home/MyBangumi")?;
let context = Arc::new(
UnitTestAppContext::builder()
.mikan(build_testing_mikan_client(mikan_base_url.clone()).await?)
.build(),
let indices = extract_mikan_bangumi_indices_meta_from_season_flow_fragment(
&fragment,
Url::parse("https://mikanani.me/")?,
);
{
let my_bangumi_without_cookie_mock = mikan_server
.mock("GET", my_bangumi_page_url.path())
.match_header(header::COOKIE, mockito::Matcher::Missing)
.with_body_from_file("tests/resources/mikan/MyBangumi-noauth.htm")
.create_async()
.await;
tracing::info!("indices: {:#?}", &indices[0]);
let bangumi_metas = extract_mikan_bangumis_meta_from_my_bangumi_page(
context.clone(),
my_bangumi_page_url.clone(),
None,
&[],
);
pin_mut!(bangumi_metas);
let bangumi_metas = bangumi_metas.try_collect::<Vec<_>>().await?;
assert!(bangumi_metas.is_empty());
assert!(my_bangumi_without_cookie_mock.matched_async().await);
}
{
let my_bangumi_with_cookie_mock = mikan_server
.mock("GET", my_bangumi_page_url.path())
.match_header(
header::COOKIE,
mockito::Matcher::AllOf(vec![
mockito::Matcher::Regex(String::from(".*\\.AspNetCore\\.Antiforgery.*")),
mockito::Matcher::Regex(String::from(
".*\\.AspNetCore\\.Identity\\.Application.*",
)),
]),
)
.with_body_from_file("tests/resources/mikan/MyBangumi.htm")
.create_async()
.await;
let expand_bangumi_mock = mikan_server
.mock("GET", "/ExpandBangumi")
.match_query(mockito::Matcher::Any)
.with_body_from_file("tests/resources/mikan/ExpandBangumi.htm")
.create_async()
.await;
let auth_secrecy = Some(MikanAuthSecrecy {
cookie: String::from(
"mikan-announcement=1; .AspNetCore.Antiforgery.abc=abc; \
.AspNetCore.Identity.Application=abc; ",
),
user_agent: Some(String::from(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like \
Gecko) Chrome/133.0.0.0 Safari/537.36 Edg/133.0.0.0",
)),
});
let bangumi_metas = extract_mikan_bangumis_meta_from_my_bangumi_page(
context.clone(),
my_bangumi_page_url,
auth_secrecy,
&[],
);
pin_mut!(bangumi_metas);
let bangumi_metas = bangumi_metas.try_collect::<Vec<_>>().await?;
assert!(!bangumi_metas.is_empty());
assert!(bangumi_metas[0].origin_poster_src.is_some());
assert!(my_bangumi_with_cookie_mock.matched_async().await);
expand_bangumi_mock.expect(bangumi_metas.len());
}
assert_eq!(indices.len(), 49);
let first = &indices[0];
assert_eq!(first.bangumi_title, "吉伊卡哇");
assert_eq!(first.mikan_bangumi_id, "3288");
assert_eq!(
first.homepage.to_string(),
String::from("https://mikanani.me/Home/Bangumi/3288")
);
assert_eq!(
first
.origin_poster_src
.as_ref()
.map(|s| s.to_string())
.unwrap_or_default(),
String::from("https://mikanani.me/images/Bangumi/202204/d8ef46c0.jpg")
);
Ok(())
}
#[rstest]
#[test]
fn test_extract_mikan_bangumi_indices_meta_from_season_flow_fragment_noauth(
before_each: (),
) -> RecorderResult<()> {
let fragment =
fs::read_to_string("tests/resources/mikan/BangumiCoverFlow-2025-spring-noauth.html")?;
let indices = extract_mikan_bangumi_indices_meta_from_season_flow_fragment(
&fragment,
Url::parse("https://mikanani.me/")?,
);
assert!(indices.is_empty());
Ok(())
}
#[rstest]
#[test]
fn test_extract_mikan_bangumi_meta_from_expand_subscribed_fragment(
before_each: (),
) -> RecorderResult<()> {
let origin_poster_src =
Url::parse("https://mikanani.me/images/Bangumi/202504/076c1094.jpg")?;
let bangumi_index = MikanBangumiIndexMeta {
homepage: Url::parse("https://mikanani.me/Home/Bangumi/3599")?,
origin_poster_src: Some(origin_poster_src.clone()),
bangumi_title: "夏日口袋".to_string(),
mikan_bangumi_id: "3599".to_string(),
};
let fragment = fs::read_to_string("tests/resources/mikan/ExpandBangumi-3599.html")?;
let bangumi = extract_mikan_bangumi_meta_from_expand_subscribed_fragment(
bangumi_index.clone(),
&fragment,
Url::parse("https://mikanani.me/")?,
)
.unwrap_or_else(|| {
panic!("bangumi should not be None");
});
assert_eq!(
bangumi.homepage,
Url::parse("https://mikanani.me/Home/Bangumi/3599#370")?
);
assert_eq!(bangumi.bangumi_title, bangumi_index.bangumi_title);
assert_eq!(bangumi.mikan_bangumi_id, bangumi_index.mikan_bangumi_id);
assert_eq!(bangumi.origin_poster_src, bangumi_index.origin_poster_src);
assert_eq!(bangumi.mikan_fansub_id, Some(String::from("370")));
assert_eq!(bangumi.fansub, Some(String::from("LoliHouse")));
Ok(())
}
#[rstest]
#[test]
fn test_extract_mikan_bangumi_meta_from_expand_subscribed_fragment_noauth(
before_each: (),
) -> RecorderResult<()> {
let origin_poster_src =
Url::parse("https://mikanani.me/images/Bangumi/202504/076c1094.jpg")?;
let bangumi_index = MikanBangumiIndexMeta {
homepage: Url::parse("https://mikanani.me/Home/Bangumi/3599")?,
origin_poster_src: Some(origin_poster_src.clone()),
bangumi_title: "夏日口袋".to_string(),
mikan_bangumi_id: "3599".to_string(),
};
let fragment = fs::read_to_string("tests/resources/mikan/ExpandBangumi-3599-noauth.html")?;
let bangumi = extract_mikan_bangumi_meta_from_expand_subscribed_fragment(
bangumi_index.clone(),
&fragment,
Url::parse("https://mikanani.me/")?,
);
assert!(bangumi.is_none());
Ok(())
}
// #[rstest]
// #[tokio::test]
// async fn test_extract_mikan_bangumis_meta_from_my_bangumi_page(
// before_each: (),
// ) -> RecorderResult<()> {
// let mut mikan_server = mockito::Server::new_async().await;
// let mikan_base_url = Url::parse(&mikan_server.url())?;
// let my_bangumi_page_url = mikan_base_url.join("/Home/MyBangumi")?;
// let context = Arc::new(
// UnitTestAppContext::builder()
//
// .mikan(build_testing_mikan_client(mikan_base_url.clone()).await?)
// .build(),
// );
// {
// let my_bangumi_without_cookie_mock = mikan_server
// .mock("GET", my_bangumi_page_url.path())
// .match_header(header::COOKIE, mockito::Matcher::Missing)
//
// .with_body_from_file("tests/resources/mikan/MyBangumi-noauth.htm")
// .create_async()
// .await;
// let bangumi_metas =
// extract_mikan_bangumis_meta_from_my_bangumi_page(
// context.clone(), my_bangumi_page_url.clone(),
// None,
// &[],
// );
// pin_mut!(bangumi_metas);
// let bangumi_metas = bangumi_metas.try_collect::<Vec<_>>().await?;
// assert!(bangumi_metas.is_empty());
// assert!(my_bangumi_without_cookie_mock.matched_async().await);
// }
// {
// let my_bangumi_with_cookie_mock = mikan_server
// .mock("GET", my_bangumi_page_url.path())
// .match_header(
// header::COOKIE,
// mockito::Matcher::AllOf(vec![
//
// mockito::Matcher::Regex(String::from(".*\\.AspNetCore\\.Antiforgery.*")),
// mockito::Matcher::Regex(String::from(
// ".*\\.AspNetCore\\.Identity\\.Application.*",
// )),
// ]),
// )
// .with_body_from_file("tests/resources/mikan/MyBangumi.htm")
// .create_async()
// .await;
// let expand_bangumi_mock = mikan_server
// .mock("GET", "/ExpandBangumi")
// .match_query(mockito::Matcher::Any)
//
// .with_body_from_file("tests/resources/mikan/ExpandBangumi.htm")
// .create_async()
// .await;
// let auth_secrecy = Some(MikanCredentialForm {
// username: String::from("test_username"),
// password: String::from("test_password"),
// user_agent: String::from(
// "Mozilla/5.0 (Windows NT 10.0; Win64; x64)
// AppleWebKit/537.36 (KHTML, like \ Gecko)
// Chrome/133.0.0.0 Safari/537.36 Edg/133.0.0.0", ),
// });
// let bangumi_metas =
// extract_mikan_bangumis_meta_from_my_bangumi_page(
// context.clone(), my_bangumi_page_url,
// auth_secrecy,
// &[],
// );
// pin_mut!(bangumi_metas);
// let bangumi_metas = bangumi_metas.try_collect::<Vec<_>>().await?;
// assert!(!bangumi_metas.is_empty());
// assert!(bangumi_metas[0].origin_poster_src.is_some());
// assert!(my_bangumi_with_cookie_mock.matched_async().await);
// expand_bangumi_mock.expect(bangumi_metas.len());
// }
// Ok(())
// }
}

View File

@@ -14,6 +14,7 @@ pub use downloader;
pub mod app;
pub mod auth;
pub mod cache;
pub mod crypto;
pub mod database;
pub mod errors;
pub mod extract;

View File

@@ -33,6 +33,7 @@ pub enum Subscriptions {
Category,
SourceUrl,
Enabled,
CredentialId,
}
#[derive(DeriveIden)]
@@ -137,6 +138,18 @@ pub enum Auth {
AuthType,
}
#[derive(DeriveIden)]
pub enum Credential3rd {
Table,
Id,
SubscriberId,
CredentialType,
Cookies,
Username,
Password,
UserAgent,
}
macro_rules! create_postgres_enum_for_active_enum {
($manager: expr, $active_enum: expr, $($enum_value:expr),+) => {
{

View File

@@ -0,0 +1,107 @@
use async_trait::async_trait;
use sea_orm_migration::{
prelude::*,
schema::{string_null, *},
};
use super::defs::{CustomSchemaManagerExt, GeneralIds, table_auto_z};
use crate::{
migrations::defs::{Credential3rd, Subscribers, Subscriptions},
models::credential_3rd::{Credential3rdType, Credential3rdTypeEnum},
};
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
create_postgres_enum_for_active_enum!(
manager,
Credential3rdTypeEnum,
Credential3rdType::Mikan
)
.await?;
manager
.create_table(
table_auto_z(Credential3rd::Table)
.col(pk_auto(Credential3rd::Id))
.col(integer(Credential3rd::SubscriberId))
.col(string(Credential3rd::CredentialType))
.col(string_null(Credential3rd::Cookies))
.col(string_null(Credential3rd::Username))
.col(string_null(Credential3rd::Password))
.col(string_null(Credential3rd::UserAgent))
.foreign_key(
ForeignKey::create()
.name("fk_credential_3rd_subscriber_id")
.from(Credential3rd::Table, Credential3rd::SubscriberId)
.to(Subscribers::Table, Subscribers::Id)
.on_update(ForeignKeyAction::Cascade)
.on_delete(ForeignKeyAction::Cascade),
)
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.name("idx_credential_3rd_credential_type")
.table(Credential3rd::Table)
.col(Credential3rd::CredentialType)
.to_owned(),
)
.await?;
manager
.create_postgres_auto_update_ts_trigger_for_col(
Credential3rd::Table,
GeneralIds::UpdatedAt,
)
.await?;
manager
.alter_table(
Table::alter()
.table(Subscriptions::Table)
.add_column_if_not_exists(integer_null(Subscriptions::CredentialId))
.add_foreign_key(
TableForeignKey::new()
.name("fk_subscriptions_credential_id")
.from_tbl(Subscriptions::Table)
.from_col(Subscriptions::CredentialId)
.to_tbl(Credential3rd::Table)
.to_col(Credential3rd::Id)
.on_update(ForeignKeyAction::Cascade)
.on_delete(ForeignKeyAction::SetNull),
)
.to_owned(),
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Subscriptions::Table)
.drop_column(Subscriptions::CredentialId)
.to_owned(),
)
.await?;
manager
.drop_table(Table::drop().table(Credential3rd::Table).to_owned())
.await?;
manager
.drop_postgres_enum_for_active_enum(Credential3rdTypeEnum)
.await?;
Ok(())
}
}

View File

@@ -7,6 +7,7 @@ pub mod m20220101_000001_init;
pub mod m20240224_082543_add_downloads;
pub mod m20240225_060853_subscriber_add_downloader;
pub mod m20241231_000001_auth;
pub mod m20250501_021523_credential_3rd;
pub struct Migrator;
@@ -18,6 +19,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240224_082543_add_downloads::Migration),
Box::new(m20240225_060853_subscriber_add_downloader::Migration),
Box::new(m20241231_000001_auth::Migration),
Box::new(m20250501_021523_credential_3rd::Migration),
]
}
}

View File

@@ -0,0 +1,143 @@
use std::sync::Arc;
use async_trait::async_trait;
use sea_orm::{ActiveValue, prelude::*};
use serde::{Deserialize, Serialize};
use crate::{
app::AppContextTrait,
crypto::UserPassCredential,
errors::{RecorderError, RecorderResult},
};
#[derive(
Debug, Clone, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize,
)]
#[sea_orm(
rs_type = "String",
db_type = "Enum",
enum_name = "credential_3rd_type"
)]
pub enum Credential3rdType {
#[sea_orm(string_value = "mikan")]
Mikan,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, DeriveEntityModel)]
#[sea_orm(table_name = "credential_3rd")]
pub struct Model {
#[sea_orm(default_expr = "Expr::current_timestamp()")]
pub created_at: DateTimeUtc,
#[sea_orm(default_expr = "Expr::current_timestamp()")]
pub updated_at: DateTimeUtc,
#[sea_orm(primary_key)]
pub id: i32,
pub subscriber_id: i32,
pub credential_type: Credential3rdType,
pub cookies: Option<String>,
pub username: Option<String>,
pub password: Option<String>,
pub user_agent: Option<String>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::subscribers::Entity",
from = "Column::SubscriberId",
to = "super::subscribers::Column::Id",
on_update = "Cascade",
on_delete = "Cascade"
)]
Subscriber,
#[sea_orm(has_many = "super::subscriptions::Entity")]
Subscription,
}
impl Related<super::subscribers::Entity> for Entity {
fn to() -> RelationDef {
Relation::Subscriber.def()
}
}
impl Related<super::subscriptions::Entity> for Entity {
fn to() -> RelationDef {
Relation::Subscription.def()
}
}
#[async_trait]
impl ActiveModelBehavior for ActiveModel {}
impl ActiveModel {
pub async fn try_encrypt(mut self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<Self> {
let crypto = ctx.crypto();
if let ActiveValue::Set(Some(username)) = self.username {
let username_enc = crypto.encrypt_credentials(&username)?;
self.username = ActiveValue::Set(Some(username_enc));
}
if let ActiveValue::Set(Some(password)) = self.password {
let password_enc = crypto.encrypt_credentials(&password)?;
self.password = ActiveValue::Set(Some(password_enc));
}
if let ActiveValue::Set(Some(cookies)) = self.cookies {
let cookies_enc = crypto.encrypt_credentials(&cookies)?;
self.cookies = ActiveValue::Set(Some(cookies_enc));
}
Ok(self)
}
}
impl Model {
pub async fn find_by_id(
ctx: Arc<dyn AppContextTrait>,
id: i32,
) -> RecorderResult<Option<Self>> {
let db = ctx.db();
let credential = Entity::find_by_id(id).one(db).await?;
Ok(credential)
}
pub fn try_into_userpass_credential(
self,
ctx: Arc<dyn AppContextTrait>,
) -> RecorderResult<UserPassCredential> {
let crypto = ctx.crypto();
let username_enc = self
.username
.ok_or_else(|| RecorderError::Credential3rdError {
message: "UserPassCredential username is required".to_string(),
source: None.into(),
})?;
let username: String = crypto.decrypt_credentials(&username_enc)?;
let password_enc = self
.password
.ok_or_else(|| RecorderError::Credential3rdError {
message: "UserPassCredential password is required".to_string(),
source: None.into(),
})?;
let password: String = crypto.decrypt_credentials(&password_enc)?;
let cookies: Option<String> = if let Some(cookies_enc) = self.cookies {
let cookies = crypto.decrypt_credentials(&cookies_enc)?;
Some(cookies)
} else {
None
};
Ok(UserPassCredential {
username,
password,
cookies,
user_agent: self.user_agent,
})
}
}

View File

@@ -9,7 +9,7 @@ use crate::{
app::AppContextTrait,
errors::RecorderResult,
extract::{
mikan::{MikanEpisodeMeta, build_mikan_episode_homepage},
mikan::{MikanEpisodeMeta, build_mikan_episode_homepage_url},
rawname::parse_episode_meta_from_raw_name,
},
};
@@ -200,8 +200,10 @@ impl ActiveModel {
})
.ok()
.unwrap_or_default();
let homepage =
build_mikan_episode_homepage(ctx.mikan().base_url().clone(), &item.mikan_episode_id);
let homepage = build_mikan_episode_homepage_url(
ctx.mikan().base_url().clone(),
&item.mikan_episode_id,
);
Ok(Self {
mikan_episode_id: ActiveValue::Set(Some(item.mikan_episode_id)),

View File

@@ -1,5 +1,6 @@
pub mod auth;
pub mod bangumi;
pub mod credential_3rd;
pub mod downloaders;
pub mod downloads;
pub mod episodes;

View File

@@ -11,7 +11,7 @@ use crate::{
errors::RecorderResult,
extract::{
mikan::{
build_mikan_bangumi_homepage, build_mikan_bangumi_rss_link,
build_mikan_bangumi_homepage_url, build_mikan_bangumi_rss_url,
extract_mikan_bangumi_meta_from_bangumi_homepage,
extract_mikan_episode_meta_from_episode_homepage,
extract_mikan_rss_channel_from_rss_link,
@@ -54,6 +54,7 @@ pub struct Model {
pub category: SubscriptionCategory,
pub source_url: String,
pub enabled: bool,
pub credential_id: Option<i32>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
@@ -74,6 +75,14 @@ pub enum Relation {
SubscriptionEpisode,
#[sea_orm(has_many = "super::subscription_bangumi::Entity")]
SubscriptionBangumi,
#[sea_orm(
belongs_to = "super::credential_3rd::Entity",
from = "Column::CredentialId",
to = "super::credential_3rd::Column::Id",
on_update = "Cascade",
on_delete = "SetNull"
)]
Credential3rd,
}
impl Related<super::subscribers::Entity> for Entity {
@@ -122,6 +131,12 @@ impl Related<super::episodes::Entity> for Entity {
}
}
impl Related<super::credential_3rd::Entity> for Entity {
fn to() -> RelationDef {
Relation::Credential3rd.def()
}
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)]
pub enum RelatedEntity {
#[sea_orm(entity = "super::subscribers::Entity")]
@@ -134,6 +149,8 @@ pub enum RelatedEntity {
SubscriptionEpisode,
#[sea_orm(entity = "super::subscription_bangumi::Entity")]
SubscriptionBangumi,
#[sea_orm(entity = "super::credential_3rd::Entity")]
Credential3rd,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@@ -270,12 +287,12 @@ impl Model {
for ((mikan_bangumi_id, mikan_fansub_id), new_ep_metas) in new_mikan_bangumi_groups
{
let mikan_base_url = ctx.mikan().base_url();
let bgm_homepage = build_mikan_bangumi_homepage(
let bgm_homepage = build_mikan_bangumi_homepage_url(
mikan_base_url.clone(),
&mikan_bangumi_id,
Some(&mikan_fansub_id),
);
let bgm_rss_link = build_mikan_bangumi_rss_link(
let bgm_rss_link = build_mikan_bangumi_rss_url(
mikan_base_url.clone(),
&mikan_bangumi_id,
Some(&mikan_fansub_id),

View File

@@ -0,0 +1,4 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskConfig {}

View File

@@ -1,279 +0,0 @@
use std::{borrow::Cow, sync::Arc};
use async_stream::stream;
use futures::{Stream, StreamExt, pin_mut};
use serde::{Serialize, de::DeserializeOwned};
use tokio::sync::{RwLock, mpsc};
use crate::{
app::AppContextTrait,
errors::app_error::{RecorderError, RecorderResult},
models,
};
pub struct TaskMeta {
pub subscriber_id: i32,
pub task_id: i32,
pub task_kind: Cow<'static, str>,
}
pub struct ReplayChannel<T: Send + Sync + Clone + 'static> {
sender: mpsc::UnboundedSender<T>,
channels: Arc<RwLock<Vec<mpsc::UnboundedSender<T>>>>,
buffer: Arc<RwLock<Vec<T>>>,
}
impl<T: Send + Sync + Clone + 'static> ReplayChannel<T> {
pub fn new(history: Vec<T>) -> Self {
let (tx, mut rx) = mpsc::unbounded_channel::<T>();
let channels = Arc::new(RwLock::new(Vec::<mpsc::UnboundedSender<T>>::new()));
let buffer = Arc::new(RwLock::new(history));
{
let channels = channels.clone();
let buffer = buffer.clone();
tokio::spawn(async move {
loop {
match rx.recv().await {
Some(value) => {
let mut w = buffer.write().await;
let senders = channels.read().await;
for s in senders.iter() {
if !s.is_closed() {
if let Err(err) = s.send(value.clone()) {
tracing::error!(err = %err, "replay-channel broadcast to other subscribers error");
}
}
}
w.push(value);
}
None => {
drop(rx);
let mut cs = channels.write().await;
cs.clear();
break;
}
}
}
});
}
Self {
sender: tx,
channels,
buffer,
}
}
pub fn sender(&self) -> &mpsc::UnboundedSender<T> {
&self.sender
}
pub async fn receiver(&self) -> mpsc::UnboundedReceiver<T> {
let (tx, rx) = mpsc::unbounded_channel();
let items = self.buffer.read().await;
for item in items.iter() {
if let Err(err) = tx.send(item.clone()) {
tracing::error!(err = %err, "replay-channel send replay value to other subscribers error");
}
}
if !self.sender.is_closed() {
let mut sw = self.channels.write().await;
sw.push(tx);
}
rx
}
pub async fn close(&self) {
let mut senders = self.channels.write().await;
senders.clear();
}
}
pub trait StreamTaskCoreTrait: Sized {
type Request: Serialize + DeserializeOwned;
type Item: Serialize + DeserializeOwned;
fn task_id(&self) -> i32;
fn task_kind(&self) -> &str;
fn new(meta: TaskMeta, request: Self::Request) -> Self;
fn request(&self) -> &Self::Request;
}
pub trait StreamTaskReplayLayoutTrait: StreamTaskCoreTrait {
fn history(&self) -> &[Arc<RecorderResult<Self::Item>>];
fn resume_from_model(
task: models::tasks::Model,
stream_items: Vec<models::task_stream_item::Model>,
) -> RecorderResult<Self>;
fn running_receiver(
&self,
) -> impl Future<Output = Option<mpsc::UnboundedReceiver<Arc<RecorderResult<Self::Item>>>>>;
#[allow(clippy::type_complexity)]
fn init_receiver(
&self,
) -> impl Future<
Output = (
mpsc::UnboundedSender<Arc<RecorderResult<Self::Item>>>,
mpsc::UnboundedReceiver<Arc<RecorderResult<Self::Item>>>,
),
>;
fn serialize_request(request: Self::Request) -> RecorderResult<serde_json::Value> {
serde_json::to_value(request).map_err(RecorderError::from)
}
fn serialize_item(item: RecorderResult<Self::Item>) -> RecorderResult<serde_json::Value> {
serde_json::to_value(item).map_err(RecorderError::from)
}
fn deserialize_request(request: serde_json::Value) -> RecorderResult<Self::Request> {
serde_json::from_value(request).map_err(RecorderError::from)
}
fn deserialize_item(item: serde_json::Value) -> RecorderResult<RecorderResult<Self::Item>> {
serde_json::from_value(item).map_err(RecorderError::from)
}
}
pub trait StreamTaskRunnerTrait: StreamTaskCoreTrait {
fn run(
context: Arc<dyn AppContextTrait>,
request: &Self::Request,
history: &[Arc<RecorderResult<Self::Item>>],
) -> impl Stream<Item = RecorderResult<Self::Item>>;
}
pub trait StreamTaskReplayRunnerTrait: StreamTaskRunnerTrait + StreamTaskReplayLayoutTrait {
fn run_shared(
&self,
context: Arc<dyn AppContextTrait>,
) -> impl Stream<Item = Arc<RecorderResult<Self::Item>>> {
stream! {
if let Some(mut receiver) = self.running_receiver().await {
while let Some(item) = receiver.recv().await {
yield item
}
} else {
let (tx, _) = self.init_receiver().await;
let stream = Self::run(context, self.request(), self.history());
pin_mut!(stream);
while let Some(item) = stream.next().await {
let item = Arc::new(item);
if let Err(err) = tx.send(item.clone()) {
tracing::error!(task_id = self.task_id(), task_kind = self.task_kind(), err = %err, "run shared send error");
}
yield item
}
};
}
}
}
pub struct StandardStreamTaskReplayLayout<Request, Item>
where
Request: Serialize + DeserializeOwned,
Item: Serialize + DeserializeOwned + Sync + Send + 'static,
{
pub meta: TaskMeta,
pub request: Request,
pub history: Vec<Arc<RecorderResult<Item>>>,
#[allow(clippy::type_complexity)]
pub channel: Arc<RwLock<Option<ReplayChannel<Arc<RecorderResult<Item>>>>>>,
}
impl<Request, Item> StreamTaskCoreTrait for StandardStreamTaskReplayLayout<Request, Item>
where
Request: Serialize + DeserializeOwned,
Item: Serialize + DeserializeOwned + Sync + Send + 'static,
{
type Request = Request;
type Item = Item;
fn task_id(&self) -> i32 {
self.meta.task_id
}
fn request(&self) -> &Self::Request {
&self.request
}
fn task_kind(&self) -> &str {
&self.meta.task_kind
}
fn new(meta: TaskMeta, request: Self::Request) -> Self {
Self {
meta,
request,
history: vec![],
channel: Arc::new(RwLock::new(None)),
}
}
}
impl<Request, Item> StreamTaskReplayLayoutTrait for StandardStreamTaskReplayLayout<Request, Item>
where
Request: Serialize + DeserializeOwned,
Item: Serialize + DeserializeOwned + Sync + Send + 'static,
{
fn history(&self) -> &[Arc<RecorderResult<Self::Item>>] {
&self.history
}
fn resume_from_model(
task: models::tasks::Model,
stream_items: Vec<models::task_stream_item::Model>,
) -> RecorderResult<Self> {
Ok(Self {
meta: TaskMeta {
task_id: task.id,
subscriber_id: task.subscriber_id,
task_kind: Cow::Owned(task.task_type),
},
request: Self::deserialize_request(task.request_data)?,
history: stream_items
.into_iter()
.map(|m| Self::deserialize_item(m.item).map(Arc::new))
.collect::<RecorderResult<Vec<_>>>()?,
channel: Arc::new(RwLock::new(None)),
})
}
async fn running_receiver(
&self,
) -> Option<mpsc::UnboundedReceiver<Arc<RecorderResult<Self::Item>>>> {
if let Some(channel) = self.channel.read().await.as_ref() {
Some(channel.receiver().await)
} else {
None
}
}
async fn init_receiver(
&self,
) -> (
mpsc::UnboundedSender<Arc<RecorderResult<Self::Item>>>,
mpsc::UnboundedReceiver<Arc<RecorderResult<Self::Item>>>,
) {
let channel = ReplayChannel::new(self.history.clone());
let rx = channel.receiver().await;
let sender = channel.sender().clone();
{
{
let mut w = self.channel.write().await;
*w = Some(channel);
}
}
(sender, rx)
}
}

View File

@@ -1,37 +0,0 @@
use std::sync::Arc;
use futures::Stream;
use serde::{Deserialize, Serialize};
use url::Url;
use crate::{
app::AppContextTrait,
errors::RecorderResult,
extract::mikan::{MikanAuthSecrecy, MikanBangumiMeta, web_extract},
tasks::core::{StandardStreamTaskReplayLayout, StreamTaskRunnerTrait},
};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExtractMikanBangumisMetaFromMyBangumiRequest {
pub my_bangumi_page_url: Url,
pub auth_secrecy: Option<MikanAuthSecrecy>,
}
pub type ExtractMikanBangumisMetaFromMyBangumiTask =
StandardStreamTaskReplayLayout<ExtractMikanBangumisMetaFromMyBangumiRequest, MikanBangumiMeta>;
impl StreamTaskRunnerTrait for ExtractMikanBangumisMetaFromMyBangumiTask {
fn run(
context: Arc<dyn AppContextTrait>,
request: &Self::Request,
history: &[Arc<RecorderResult<Self::Item>>],
) -> impl Stream<Item = RecorderResult<Self::Item>> {
let context = context.clone();
web_extract::extract_mikan_bangumis_meta_from_my_bangumi_page(
context,
request.my_bangumi_page_url.clone(),
request.auth_secrecy.clone(),
history,
)
}
}

View File

@@ -0,0 +1,172 @@
use std::{ops::Deref, sync::Arc};
use apalis::prelude::*;
use apalis_sql::postgres::PostgresStorage;
use fetch::fetch_html;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use crate::{
app::AppContextTrait,
errors::{RecorderError, RecorderResult},
extract::mikan::{
MikanBangumiMeta, MikanSeasonStr, build_mikan_season_flow_url,
extract_mikan_bangumi_indices_meta_from_season_flow_fragment,
web_extract::{
MikanBangumiIndexMeta, build_mikan_bangumi_expand_subscribed_fragment_url,
extract_mikan_bangumi_meta_from_expand_subscribed_fragment,
},
},
};
const TASK_NAME: &str = "mikan_extract_season_subscription";
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExtractMikanSeasonSubscriptionTask {
pub task_id: i32,
pub year: i32,
pub season_str: MikanSeasonStr,
pub credential_id: i32,
pub subscription_id: i32,
pub subscriber_id: i32,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExtractMikanSeasonSubscriptionFansubsTask {
pub task_id: i32,
pub year: i32,
pub season_str: MikanSeasonStr,
pub credential_id: i32,
pub subscription_id: i32,
pub subscriber_id: i32,
pub bangumi_indices: Vec<MikanBangumiIndexMeta>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExtractMikanSeasonSubscriptionTaskResult {
pub task_id: i32,
pub year: i32,
pub season_str: MikanSeasonStr,
pub credential_id: i32,
pub subscription_id: i32,
pub subscriber_id: i32,
pub bangumi_metas: Vec<MikanBangumiMeta>,
}
pub async fn extract_mikan_season_subscription(
job: ExtractMikanSeasonSubscriptionTask,
data: Data<Arc<dyn AppContextTrait>>,
) -> RecorderResult<GoTo<ExtractMikanSeasonSubscriptionFansubsTask>> {
let ctx = data.deref();
let mikan_client = ctx
.mikan()
.fork_with_credential(ctx.clone(), Some(job.credential_id))
.await?;
let mikan_base_url = mikan_client.base_url().clone();
let season_flow_fragment_url =
build_mikan_season_flow_url(mikan_base_url.clone(), job.year, job.season_str);
let season_flow_fragment = fetch_html(&mikan_client, season_flow_fragment_url.clone()).await?;
let mut bangumi_indices = extract_mikan_bangumi_indices_meta_from_season_flow_fragment(
&season_flow_fragment,
mikan_base_url.clone(),
);
if bangumi_indices.is_empty() && !mikan_client.has_login().await? {
mikan_client.login().await?;
let season_flow_fragment =
fetch_html(&mikan_client, season_flow_fragment_url.clone()).await?;
bangumi_indices = extract_mikan_bangumi_indices_meta_from_season_flow_fragment(
&season_flow_fragment,
mikan_base_url.clone(),
);
}
Ok(GoTo::Next(ExtractMikanSeasonSubscriptionFansubsTask {
task_id: job.task_id,
year: job.year,
season_str: job.season_str,
credential_id: job.credential_id,
subscription_id: job.subscription_id,
subscriber_id: job.subscriber_id,
bangumi_indices,
}))
}
pub async fn extract_mikan_season_subscription_fansubs(
job: ExtractMikanSeasonSubscriptionFansubsTask,
data: Data<Arc<dyn AppContextTrait>>,
) -> RecorderResult<GoTo<ExtractMikanSeasonSubscriptionTaskResult>> {
let ctx = data.deref();
let mikan_client = ctx
.mikan()
.fork_with_credential(ctx.clone(), Some(job.credential_id))
.await?;
let bangumi_indices = job.bangumi_indices;
let mut bangumi_metas = vec![];
let mikan_base_url = mikan_client.base_url().clone();
for bangumi_index in bangumi_indices {
let bangumi_title = bangumi_index.bangumi_title.clone();
let bangumi_expand_subscribed_fragment_url =
build_mikan_bangumi_expand_subscribed_fragment_url(
mikan_base_url.clone(),
&bangumi_index.mikan_bangumi_id,
);
let bangumi_expand_subscribed_fragment =
fetch_html(&mikan_client, bangumi_expand_subscribed_fragment_url).await?;
let bangumi_meta = extract_mikan_bangumi_meta_from_expand_subscribed_fragment(
bangumi_index,
&bangumi_expand_subscribed_fragment,
mikan_base_url.clone(),
)
.with_whatever_context::<_, String, RecorderError>(|| {
format!(
"failed to extract mikan bangumi fansub of title = {}",
bangumi_title
)
})?;
bangumi_metas.push(bangumi_meta);
}
Ok(GoTo::Done(ExtractMikanSeasonSubscriptionTaskResult {
bangumi_metas,
credential_id: job.credential_id,
season_str: job.season_str,
subscriber_id: job.subscriber_id,
subscription_id: job.subscription_id,
task_id: job.task_id,
year: job.year,
}))
}
pub fn register_extract_mikan_season_subscription_task(
monitor: Monitor,
ctx: Arc<dyn AppContextTrait>,
) -> RecorderResult<(Monitor, PostgresStorage<StepRequest<serde_json::Value>>)> {
let pool = ctx.db().get_postgres_connection_pool().clone();
let storage = PostgresStorage::new(pool);
let steps = StepBuilder::new()
.step_fn(extract_mikan_season_subscription)
.step_fn(extract_mikan_season_subscription_fansubs);
let worker = WorkerBuilder::new(TASK_NAME)
.catch_panic()
.enable_tracing()
.data(ctx)
.backend(storage.clone())
.build_stepped(steps);
Ok((monitor.register(worker), storage))
}

View File

@@ -1 +1,5 @@
pub mod extract_mikan_bangumis_meta_from_my_bangumi;
mod extract_season_subscription;
pub use extract_season_subscription::{
ExtractMikanSeasonSubscriptionTask, register_extract_mikan_season_subscription_task,
};

View File

@@ -1,4 +1,6 @@
pub mod core;
pub mod config;
pub mod mikan;
pub mod service;
pub mod registry;
pub use config::TaskConfig;
pub use service::TaskService;

View File

@@ -1 +0,0 @@

View File

@@ -1,4 +1,41 @@
#[derive(Debug)]
pub struct TaskService {}
use std::{fmt::Debug, sync::Arc};
impl TaskService {}
use apalis::prelude::*;
use apalis_sql::postgres::PostgresStorage;
use tokio::sync::Mutex;
use super::{TaskConfig, mikan::register_extract_mikan_season_subscription_task};
use crate::{app::AppContextTrait, errors::RecorderResult};
pub struct TaskService {
config: TaskConfig,
#[allow(dead_code)]
monitor: Arc<Mutex<Monitor>>,
pub extract_mikan_season_subscription_task_storage:
PostgresStorage<StepRequest<serde_json::Value>>,
}
impl TaskService {
pub async fn from_config_and_ctx(
config: TaskConfig,
ctx: Arc<dyn AppContextTrait>,
) -> RecorderResult<Self> {
let monitor = Monitor::new();
let (monitor, extract_mikan_season_subscription_task_storage) =
register_extract_mikan_season_subscription_task(monitor, ctx.clone())?;
Ok(Self {
config,
monitor: Arc::new(Mutex::new(monitor)),
extract_mikan_season_subscription_task_storage,
})
}
}
impl Debug for TaskService {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TaskService")
.field("config", &self.config)
.finish()
}
}

View File

@@ -1,3 +1,5 @@
use std::fmt::Debug;
use typed_builder::TypedBuilder;
use crate::app::AppContextTrait;
@@ -13,12 +15,20 @@ pub struct UnitTestAppContext {
auth: Option<crate::auth::AuthService>,
graphql: Option<crate::graphql::GraphQLService>,
storage: Option<crate::storage::StorageService>,
crypto: Option<crate::crypto::CryptoService>,
tasks: Option<crate::tasks::TaskService>,
#[builder(default = Some(String::from(env!("CARGO_MANIFEST_DIR"))))]
working_dir: Option<String>,
#[builder(default = crate::app::Environment::Testing, setter(!strip_option))]
environment: crate::app::Environment,
}
impl Debug for UnitTestAppContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "UnitTestAppContext")
}
}
impl AppContextTrait for UnitTestAppContext {
fn logger(&self) -> &crate::logger::LoggerService {
self.logger.as_ref().expect("should set logger")
@@ -59,4 +69,12 @@ impl AppContextTrait for UnitTestAppContext {
fn working_dir(&self) -> &String {
self.working_dir.as_ref().expect("should set working_dir")
}
fn crypto(&self) -> &crate::crypto::CryptoService {
self.crypto.as_ref().expect("should set crypto")
}
fn task(&self) -> &crate::tasks::TaskService {
self.tasks.as_ref().expect("should set tasks")
}
}