Compare commits
4 Commits
571caf50ff
...
c8501b1768
Author | SHA1 | Date | |
---|---|---|---|
c8501b1768 | |||
3a8eb88e1a | |||
003d8840fd | |||
41ff5c2a11 |
10
Cargo.lock
generated
10
Cargo.lock
generated
@ -1580,6 +1580,15 @@ dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "croner"
|
||||
version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c344b0690c1ad1c7176fe18eb173e0c927008fdaaa256e40dfd43ddd149c0843"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-channel"
|
||||
version = "0.5.15"
|
||||
@ -6750,6 +6759,7 @@ dependencies = [
|
||||
"cocoon",
|
||||
"color-eyre",
|
||||
"convert_case 0.8.0",
|
||||
"croner",
|
||||
"ctor",
|
||||
"dotenvy",
|
||||
"downloader",
|
||||
|
@ -1,17 +0,0 @@
|
||||
HOST="konobangu.com"
|
||||
DATABASE_URL = "postgres://konobangu:konobangu@localhost:5432/konobangu"
|
||||
STORAGE_DATA_DIR = "./data"
|
||||
AUTH_TYPE = "basic" # or oidc
|
||||
BASIC_USER = "konobangu"
|
||||
BASIC_PASSWORD = "konobangu"
|
||||
# OIDC_ISSUER="https://auth.logto.io/oidc"
|
||||
# OIDC_AUDIENCE = "https://konobangu.com/api"
|
||||
# OIDC_CLIENT_ID = "client_id"
|
||||
# OIDC_CLIENT_SECRET = "client_secret" # optional
|
||||
# OIDC_EXTRA_SCOPES = "read:konobangu write:konobangu"
|
||||
# OIDC_EXTRA_CLAIM_KEY = ""
|
||||
# OIDC_EXTRA_CLAIM_VALUE = ""
|
||||
# MIKAN_PROXY = ""
|
||||
# MIKAN_PROXY_AUTH_HEADER = ""
|
||||
# MIKAN_NO_PROXY = ""
|
||||
# MIKAN_PROXY_ACCEPT_INVALID_CERTS = "true"
|
@ -1,17 +0,0 @@
|
||||
HOST="konobangu.com"
|
||||
DATABASE_URL = "postgres://konobangu:konobangu@localhost:5432/konobangu"
|
||||
STORAGE_DATA_DIR = "./data"
|
||||
AUTH_TYPE = "basic" # or oidc
|
||||
BASIC_USER = "konobangu"
|
||||
BASIC_PASSWORD = "konobangu"
|
||||
# OIDC_ISSUER="https://auth.logto.io/oidc"
|
||||
# OIDC_AUDIENCE = "https://konobangu.com/api"
|
||||
# OIDC_CLIENT_ID = "client_id"
|
||||
# OIDC_CLIENT_SECRET = "client_secret" # optional
|
||||
# OIDC_EXTRA_SCOPES = "read:konobangu write:konobangu"
|
||||
# OIDC_EXTRA_CLAIM_KEY = ""
|
||||
# OIDC_EXTRA_CLAIM_VALUE = ""
|
||||
MIKAN_PROXY = "http://127.0.0.1:8899"
|
||||
# MIKAN_PROXY_AUTH_HEADER = ""
|
||||
# MIKAN_NO_PROXY = ""
|
||||
MIKAN_PROXY_ACCEPT_INVALID_CERTS = true
|
18
apps/recorder/.env.development
Normal file
18
apps/recorder/.env.development
Normal file
@ -0,0 +1,18 @@
|
||||
LOGGER__LEVEL = "debug"
|
||||
|
||||
DATABASE__URI = "postgres://konobangu:konobangu@localhost:5432/konobangu"
|
||||
|
||||
AUTH__AUTH_TYPE = "basic"
|
||||
AUTH__BASIC_USER = "konobangu"
|
||||
AUTH__BASIC_PASSWORD = "konobangu"
|
||||
|
||||
# AUTH__OIDC_ISSUER = "https://auth.logto.io/oidc"
|
||||
# AUTH__OIDC_AUDIENCE = "https://konobangu.com/api"
|
||||
# AUTH__OIDC_CLIENT_ID = "client_id"
|
||||
# AUTH__OIDC_CLIENT_SECRET = "client_secret" # optional
|
||||
# AUTH__OIDC_EXTRA_SCOPES = "read:konobangu write:konobangu"
|
||||
# AUTH__OIDC_EXTRA_CLAIM_KEY = ""
|
||||
# AUTH__OIDC_EXTRA_CLAIM_VALUE = ""
|
||||
|
||||
MIKAN__HTTP_CLIENT__PROXY__ACCEPT_INVALID_CERTS = true
|
||||
MIKAN__HTTP_CLIENT__PROXY__SERVER = "http://127.0.0.1:8899"
|
15
apps/recorder/.env.production.example
Normal file
15
apps/recorder/.env.production.example
Normal file
@ -0,0 +1,15 @@
|
||||
HOST="konobangu.com"
|
||||
|
||||
DATABASE__URI = "postgres://konobangu:konobangu@localhost:5432/konobangu"
|
||||
|
||||
AUTH__AUTH_TYPE = "basic" # or oidc
|
||||
AUTH__BASIC_USER = "konobangu"
|
||||
AUTH__BASIC_PASSWORD = "konobangu"
|
||||
|
||||
# AUTH__OIDC_ISSUER="https://auth.logto.io/oidc"
|
||||
# AUTH__OIDC_AUDIENCE = "https://konobangu.com/api"
|
||||
# AUTH__OIDC_CLIENT_ID = "client_id"
|
||||
# AUTH__OIDC_CLIENT_SECRET = "client_secret" # optional
|
||||
# AUTH__OIDC_EXTRA_SCOPES = "read:konobangu write:konobangu"
|
||||
# AUTH__OIDC_EXTRA_CLAIM_KEY = ""
|
||||
# AUTH__OIDC_EXTRA_CLAIM_VALUE = ""
|
4
apps/recorder/.gitignore
vendored
4
apps/recorder/.gitignore
vendored
@ -28,4 +28,6 @@ dist/
|
||||
temp/*
|
||||
!temp/.gitkeep
|
||||
tests/resources/mikan/classic_episodes/*/*
|
||||
!tests/resources/mikan/classic_episodes/parquet/tiny.parquet
|
||||
!tests/resources/mikan/classic_episodes/parquet/tiny.parquet
|
||||
webui/
|
||||
data/
|
@ -126,7 +126,7 @@ seaography = { version = "1.1", features = [
|
||||
"with-postgres-array",
|
||||
"with-json-as-scalar",
|
||||
] }
|
||||
tower = "0.5.2"
|
||||
tower = { version = "0.5.2", features = ["util"] }
|
||||
tower-http = { version = "0.6", features = [
|
||||
"trace",
|
||||
"catch-panic",
|
||||
@ -164,6 +164,7 @@ quick-xml = { version = "0.37.5", features = [
|
||||
"serde-types",
|
||||
"serde",
|
||||
] }
|
||||
croner = "2.2.0"
|
||||
|
||||
[dev-dependencies]
|
||||
inquire = { workspace = true }
|
||||
|
@ -4,8 +4,8 @@
|
||||
enable = true
|
||||
# Enable pretty backtrace (sets RUST_BACKTRACE=1)
|
||||
pretty_backtrace = true
|
||||
level = "info"
|
||||
# Log level, options: trace, debug, info, warn or error.
|
||||
level = "debug"
|
||||
# Define the logging format. options: compact, pretty or Json
|
||||
format = "compact"
|
||||
# By default the logger has filtering only logs that came from your code or logs that came from `loco` framework. to see all third party libraries
|
||||
@ -77,7 +77,7 @@ max_connections = 10
|
||||
auto_migrate = true
|
||||
|
||||
[storage]
|
||||
data_dir = '{{ get_env(name="STORAGE_DATA_DIR", default="./data") }}'
|
||||
data_dir = './data'
|
||||
|
||||
[mikan]
|
||||
base_url = "https://mikanani.me/"
|
||||
@ -89,26 +89,6 @@ leaky_bucket_initial_tokens = 1
|
||||
leaky_bucket_refill_tokens = 1
|
||||
leaky_bucket_refill_interval = 500
|
||||
|
||||
|
||||
[mikan.http_client.proxy]
|
||||
server = '{{ get_env(name="MIKAN_PROXY", default = "") }}'
|
||||
auth_header = '{{ get_env(name="MIKAN_PROXY_AUTH_HEADER", default = "") }}'
|
||||
no_proxy = '{{ get_env(name="MIKAN_NO_PROXY", default = "") }}'
|
||||
accept_invalid_certs = '{{ get_env(name="MIKAN_PROXY_ACCEPT_INVALID_CERTS", default = "false") }}'
|
||||
|
||||
|
||||
[auth]
|
||||
auth_type = '{{ get_env(name="AUTH_TYPE", default = "basic") }}'
|
||||
basic_user = '{{ get_env(name="BASIC_USER", default = "konobangu") }}'
|
||||
basic_password = '{{ get_env(name="BASIC_PASSWORD", default = "konobangu") }}'
|
||||
oidc_issuer = '{{ get_env(name="OIDC_ISSUER", default = "") }}'
|
||||
oidc_audience = '{{ get_env(name="OIDC_AUDIENCE", default = "") }}'
|
||||
oidc_client_id = '{{ get_env(name="OIDC_CLIENT_ID", default = "") }}'
|
||||
oidc_client_secret = '{{ get_env(name="OIDC_CLIENT_SECRET", default = "") }}'
|
||||
oidc_extra_scopes = '{{ get_env(name="OIDC_EXTRA_SCOPES", default = "") }}'
|
||||
oidc_extra_claim_key = '{{ get_env(name="OIDC_EXTRA_CLAIM_KEY", default = "") }}'
|
||||
oidc_extra_claim_value = '{{ get_env(name="OIDC_EXTRA_CLAIM_VALUE", default = "") }}'
|
||||
|
||||
[graphql]
|
||||
# depth_limit = inf
|
||||
# complexity_limit = inf
|
||||
|
@ -72,6 +72,11 @@ impl AppBuilder {
|
||||
}
|
||||
|
||||
pub async fn build(self) -> RecorderResult<App> {
|
||||
if self.working_dir != "." {
|
||||
std::env::set_current_dir(&self.working_dir)?;
|
||||
println!("set current dir to working dir: {}", self.working_dir);
|
||||
}
|
||||
|
||||
self.load_env().await?;
|
||||
|
||||
let config = self.load_config().await?;
|
||||
@ -86,22 +91,12 @@ impl AppBuilder {
|
||||
}
|
||||
|
||||
pub async fn load_env(&self) -> RecorderResult<()> {
|
||||
AppConfig::load_dotenv(
|
||||
&self.environment,
|
||||
&self.working_dir,
|
||||
self.dotenv_file.as_deref(),
|
||||
)
|
||||
.await?;
|
||||
AppConfig::load_dotenv(&self.environment, self.dotenv_file.as_deref()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn load_config(&self) -> RecorderResult<AppConfig> {
|
||||
let config = AppConfig::load_config(
|
||||
&self.environment,
|
||||
&self.working_dir,
|
||||
self.config_file.as_deref(),
|
||||
)
|
||||
.await?;
|
||||
let config = AppConfig::load_config(&self.environment, self.config_file.as_deref()).await?;
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
@ -136,7 +131,7 @@ impl AppBuilder {
|
||||
}
|
||||
|
||||
pub fn working_dir_from_manifest_dir(self) -> Self {
|
||||
let manifest_dir = if cfg!(debug_assertions) || cfg!(test) {
|
||||
let manifest_dir = if cfg!(debug_assertions) || cfg!(test) || cfg!(feature = "playground") {
|
||||
env!("CARGO_MANIFEST_DIR")
|
||||
} else {
|
||||
"./apps/recorder"
|
||||
|
@ -1,8 +1,13 @@
|
||||
use std::{fs, path::Path, str};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fs,
|
||||
path::Path,
|
||||
str::{self, FromStr},
|
||||
};
|
||||
|
||||
use figment::{
|
||||
Figment, Provider,
|
||||
providers::{Format, Json, Toml, Yaml},
|
||||
providers::{Env, Format, Json, Toml, Yaml},
|
||||
};
|
||||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@ -55,8 +60,8 @@ impl AppConfig {
|
||||
format!(".{}.local", environment.full_name()),
|
||||
format!(".{}.local", environment.short_name()),
|
||||
String::from(".local"),
|
||||
environment.full_name().to_string(),
|
||||
environment.short_name().to_string(),
|
||||
format!(".{}", environment.full_name()),
|
||||
format!(".{}", environment.short_name()),
|
||||
String::from(""),
|
||||
]
|
||||
}
|
||||
@ -65,6 +70,102 @@ impl AppConfig {
|
||||
Toml::string(DEFAULT_CONFIG_MIXIN)
|
||||
}
|
||||
|
||||
fn build_enhanced_tera_engine() -> tera::Tera {
|
||||
let mut tera = tera::Tera::default();
|
||||
tera.register_filter(
|
||||
"cast_to",
|
||||
|value: &tera::Value,
|
||||
args: &HashMap<String, tera::Value>|
|
||||
-> tera::Result<tera::Value> {
|
||||
let target_type = args
|
||||
.get("type")
|
||||
.and_then(|v| v.as_str())
|
||||
.ok_or_else(|| tera::Error::msg("invalid target type: should be string"))?;
|
||||
|
||||
let target_type = TeraCastToFilterType::from_str(target_type)
|
||||
.map_err(|e| tera::Error::msg(format!("invalid target type: {e}")))?;
|
||||
|
||||
let input_str = value.as_str().unwrap_or("");
|
||||
|
||||
match target_type {
|
||||
TeraCastToFilterType::Boolean => {
|
||||
let is_true = matches!(input_str.to_lowercase().as_str(), "true" | "1");
|
||||
let is_false = matches!(input_str.to_lowercase().as_str(), "false" | "0");
|
||||
if is_true {
|
||||
Ok(tera::Value::Bool(true))
|
||||
} else if is_false {
|
||||
Ok(tera::Value::Bool(false))
|
||||
} else {
|
||||
Err(tera::Error::msg(
|
||||
"target type is bool but value is not a boolean like true, false, \
|
||||
1, 0",
|
||||
))
|
||||
}
|
||||
}
|
||||
TeraCastToFilterType::Integer => {
|
||||
let parsed = input_str.parse::<i64>().map_err(|e| {
|
||||
tera::Error::call_filter("invalid integer".to_string(), e)
|
||||
})?;
|
||||
Ok(tera::Value::Number(serde_json::Number::from(parsed)))
|
||||
}
|
||||
TeraCastToFilterType::Unsigned => {
|
||||
let parsed = input_str.parse::<u64>().map_err(|e| {
|
||||
tera::Error::call_filter("invalid unsigned integer".to_string(), e)
|
||||
})?;
|
||||
Ok(tera::Value::Number(serde_json::Number::from(parsed)))
|
||||
}
|
||||
TeraCastToFilterType::Float => {
|
||||
let parsed = input_str.parse::<f64>().map_err(|e| {
|
||||
tera::Error::call_filter("invalid float".to_string(), e)
|
||||
})?;
|
||||
Ok(tera::Value::Number(
|
||||
serde_json::Number::from_f64(parsed).ok_or_else(|| {
|
||||
tera::Error::msg("failed to convert f64 to serde_json::Number")
|
||||
})?,
|
||||
))
|
||||
}
|
||||
TeraCastToFilterType::String => Ok(tera::Value::String(input_str.to_string())),
|
||||
TeraCastToFilterType::Null => Ok(tera::Value::Null),
|
||||
}
|
||||
},
|
||||
);
|
||||
tera.register_filter(
|
||||
"try_auto_cast",
|
||||
|value: &tera::Value,
|
||||
_args: &HashMap<String, tera::Value>|
|
||||
-> tera::Result<tera::Value> {
|
||||
let input_str = value.as_str().unwrap_or("");
|
||||
|
||||
if input_str == "null" {
|
||||
return Ok(tera::Value::Null);
|
||||
}
|
||||
|
||||
if matches!(input_str, "true" | "false") {
|
||||
return Ok(tera::Value::Bool(input_str == "true"));
|
||||
}
|
||||
|
||||
if let Ok(parsed) = input_str.parse::<i64>() {
|
||||
return Ok(tera::Value::Number(serde_json::Number::from(parsed)));
|
||||
}
|
||||
|
||||
if let Ok(parsed) = input_str.parse::<u64>() {
|
||||
return Ok(tera::Value::Number(serde_json::Number::from(parsed)));
|
||||
}
|
||||
|
||||
if let Ok(parsed) = input_str.parse::<f64>() {
|
||||
return Ok(tera::Value::Number(
|
||||
serde_json::Number::from_f64(parsed).ok_or_else(|| {
|
||||
tera::Error::msg("failed to convert f64 to serde_json::Number")
|
||||
})?,
|
||||
));
|
||||
}
|
||||
|
||||
Ok(tera::Value::String(input_str.to_string()))
|
||||
},
|
||||
);
|
||||
tera
|
||||
}
|
||||
|
||||
pub fn merge_provider_from_file(
|
||||
fig: Figment,
|
||||
filepath: impl AsRef<Path>,
|
||||
@ -72,11 +173,9 @@ impl AppConfig {
|
||||
) -> RecorderResult<Figment> {
|
||||
let content = fs::read_to_string(filepath)?;
|
||||
|
||||
let rendered = tera::Tera::one_off(
|
||||
&content,
|
||||
&tera::Context::from_value(serde_json::json!({}))?,
|
||||
false,
|
||||
)?;
|
||||
let mut tera_engine = AppConfig::build_enhanced_tera_engine();
|
||||
let rendered =
|
||||
tera_engine.render_str(&content, &tera::Context::from_value(serde_json::json!({}))?)?;
|
||||
|
||||
Ok(match ext {
|
||||
".toml" => fig.merge(Toml::string(&rendered)),
|
||||
@ -88,13 +187,12 @@ impl AppConfig {
|
||||
|
||||
pub async fn load_dotenv(
|
||||
environment: &Environment,
|
||||
working_dir: &str,
|
||||
dotenv_file: Option<&str>,
|
||||
) -> RecorderResult<()> {
|
||||
let try_dotenv_file_or_dirs = if dotenv_file.is_some() {
|
||||
vec![dotenv_file]
|
||||
} else {
|
||||
vec![Some(working_dir)]
|
||||
vec![Some(".")]
|
||||
};
|
||||
|
||||
let priority_suffix = &AppConfig::priority_suffix(environment);
|
||||
@ -111,11 +209,16 @@ impl AppConfig {
|
||||
for f in try_filenames.iter() {
|
||||
let p = try_dotenv_file_or_dir_path.join(f);
|
||||
if p.exists() && p.is_file() {
|
||||
println!("Loading dotenv file: {}", p.display());
|
||||
dotenvy::from_path(p)?;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else if try_dotenv_file_or_dir_path.is_file() {
|
||||
println!(
|
||||
"Loading dotenv file: {}",
|
||||
try_dotenv_file_or_dir_path.display()
|
||||
);
|
||||
dotenvy::from_path(try_dotenv_file_or_dir_path)?;
|
||||
break;
|
||||
}
|
||||
@ -127,13 +230,12 @@ impl AppConfig {
|
||||
|
||||
pub async fn load_config(
|
||||
environment: &Environment,
|
||||
working_dir: &str,
|
||||
config_file: Option<&str>,
|
||||
) -> RecorderResult<AppConfig> {
|
||||
let try_config_file_or_dirs = if config_file.is_some() {
|
||||
vec![config_file]
|
||||
} else {
|
||||
vec![Some(working_dir)]
|
||||
vec![Some(".")]
|
||||
};
|
||||
|
||||
let allowed_extensions = &AppConfig::allowed_extension();
|
||||
@ -159,6 +261,7 @@ impl AppConfig {
|
||||
let p = try_config_file_or_dir_path.join(f);
|
||||
if p.exists() && p.is_file() {
|
||||
fig = AppConfig::merge_provider_from_file(fig, &p, ext)?;
|
||||
println!("Loaded config file: {}", p.display());
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -169,13 +272,52 @@ impl AppConfig {
|
||||
{
|
||||
fig =
|
||||
AppConfig::merge_provider_from_file(fig, try_config_file_or_dir_path, ext)?;
|
||||
println!(
|
||||
"Loaded config file: {}",
|
||||
try_config_file_or_dir_path.display()
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fig = fig.merge(Env::prefixed("").split("__").lowercase(true));
|
||||
|
||||
let app_config: AppConfig = fig.extract()?;
|
||||
|
||||
Ok(app_config)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
enum TeraCastToFilterType {
|
||||
#[serde(alias = "str")]
|
||||
String,
|
||||
#[serde(alias = "bool")]
|
||||
Boolean,
|
||||
#[serde(alias = "int")]
|
||||
Integer,
|
||||
#[serde(alias = "uint")]
|
||||
Unsigned,
|
||||
#[serde(alias = "float")]
|
||||
Float,
|
||||
#[serde(alias = "null")]
|
||||
Null,
|
||||
}
|
||||
|
||||
impl FromStr for TeraCastToFilterType {
|
||||
type Err = String;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s {
|
||||
"string" | "str" => Ok(TeraCastToFilterType::String),
|
||||
"boolean" | "bool" => Ok(TeraCastToFilterType::Boolean),
|
||||
"integer" | "int" => Ok(TeraCastToFilterType::Integer),
|
||||
"unsigned" | "uint" => Ok(TeraCastToFilterType::Unsigned),
|
||||
"float" => Ok(TeraCastToFilterType::Float),
|
||||
"null" => Ok(TeraCastToFilterType::Null),
|
||||
_ => Err(format!("invalid target type: {s}")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,11 +1,13 @@
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
|
||||
use axum::Router;
|
||||
use axum::{Router, middleware::from_fn_with_state};
|
||||
use tokio::{net::TcpSocket, signal};
|
||||
use tower_http::services::{ServeDir, ServeFile};
|
||||
use tracing::instrument;
|
||||
|
||||
use super::{builder::AppBuilder, context::AppContextTrait};
|
||||
use crate::{
|
||||
auth::webui_auth_middleware,
|
||||
errors::{RecorderError, RecorderResult},
|
||||
web::{
|
||||
controller::{self, core::ControllerTrait},
|
||||
@ -58,13 +60,19 @@ impl App {
|
||||
controller::oidc::create(context.clone()),
|
||||
controller::metadata::create(context.clone()),
|
||||
controller::r#static::create(context.clone()),
|
||||
controller::feeds::create(context.clone()),
|
||||
controller::feeds::create(context.clone())
|
||||
)?;
|
||||
|
||||
for c in [graphql_c, oidc_c, metadata_c, static_c, feeds_c] {
|
||||
router = c.apply_to(router);
|
||||
}
|
||||
|
||||
router = router
|
||||
.fallback_service(
|
||||
ServeDir::new("webui").not_found_service(ServeFile::new("webui/index.html")),
|
||||
)
|
||||
.layer(from_fn_with_state(context.clone(), webui_auth_middleware));
|
||||
|
||||
let middlewares = default_middleware_stack(context.clone());
|
||||
for mid in middlewares {
|
||||
if mid.is_enabled() {
|
||||
@ -99,26 +107,12 @@ impl App {
|
||||
Ok::<(), RecorderError>(())
|
||||
},
|
||||
async {
|
||||
{
|
||||
let monitor = task.setup_monitor().await?;
|
||||
if graceful_shutdown {
|
||||
monitor
|
||||
.run_with_signal(async move {
|
||||
Self::shutdown_signal().await;
|
||||
tracing::info!("apalis shutting down...");
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
} else {
|
||||
monitor.run().await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok::<(), RecorderError>(())
|
||||
},
|
||||
async {
|
||||
let listener = task.setup_listener().await?;
|
||||
listener.listen().await?;
|
||||
task.run(if graceful_shutdown {
|
||||
Some(Self::shutdown_signal)
|
||||
} else {
|
||||
None
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok::<(), RecorderError>(())
|
||||
}
|
||||
|
@ -7,7 +7,10 @@ use axum::{
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
|
||||
use crate::{app::AppContextTrait, auth::AuthServiceTrait};
|
||||
use crate::{
|
||||
app::AppContextTrait,
|
||||
auth::{AuthService, AuthServiceTrait},
|
||||
};
|
||||
|
||||
pub async fn auth_middleware(
|
||||
State(ctx): State<Arc<dyn AppContextTrait>>,
|
||||
@ -38,3 +41,37 @@ pub async fn auth_middleware(
|
||||
|
||||
response
|
||||
}
|
||||
|
||||
pub async fn webui_auth_middleware(
|
||||
State(ctx): State<Arc<dyn AppContextTrait>>,
|
||||
request: Request,
|
||||
next: Next,
|
||||
) -> Response {
|
||||
if (!request.uri().path().starts_with("/api"))
|
||||
&& let AuthService::Basic(auth_service) = ctx.auth()
|
||||
{
|
||||
let (mut parts, body) = request.into_parts();
|
||||
|
||||
let mut response = match auth_service
|
||||
.extract_user_info(ctx.as_ref() as &dyn AppContextTrait, &mut parts)
|
||||
.await
|
||||
{
|
||||
Ok(auth_user_info) => {
|
||||
let mut request = Request::from_parts(parts, body);
|
||||
request.extensions_mut().insert(auth_user_info);
|
||||
next.run(request).await
|
||||
}
|
||||
Err(auth_error) => auth_error.into_response(),
|
||||
};
|
||||
|
||||
if let Some(header_value) = auth_service.www_authenticate_header_value() {
|
||||
response
|
||||
.headers_mut()
|
||||
.insert(header::WWW_AUTHENTICATE, header_value);
|
||||
};
|
||||
|
||||
response
|
||||
} else {
|
||||
next.run(request).await
|
||||
}
|
||||
}
|
||||
|
@ -7,5 +7,5 @@ pub mod service;
|
||||
|
||||
pub use config::{AuthConfig, BasicAuthConfig, OidcAuthConfig};
|
||||
pub use errors::AuthError;
|
||||
pub use middleware::auth_middleware;
|
||||
pub use middleware::{auth_middleware, webui_auth_middleware};
|
||||
pub use service::{AuthService, AuthServiceTrait, AuthUserInfo};
|
||||
|
@ -21,7 +21,6 @@ use openidconnect::{
|
||||
OAuth2TokenResponse, PkceCodeChallenge, PkceCodeVerifier, RedirectUrl, TokenResponse,
|
||||
core::{CoreAuthenticationFlow, CoreClient, CoreProviderMetadata},
|
||||
};
|
||||
use sea_orm::DbErr;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use snafu::ResultExt;
|
||||
@ -338,9 +337,9 @@ impl AuthServiceTrait for OidcAuthService {
|
||||
}
|
||||
}
|
||||
let subscriber_auth = match crate::models::auth::Model::find_by_pid(ctx, sub).await {
|
||||
Err(RecorderError::DbError {
|
||||
source: DbErr::RecordNotFound(..),
|
||||
}) => crate::models::auth::Model::create_from_oidc(ctx, sub.to_string()).await,
|
||||
Err(RecorderError::ModelEntityNotFound { .. }) => {
|
||||
crate::models::auth::Model::create_from_oidc(ctx, sub.to_string()).await
|
||||
}
|
||||
r => r,
|
||||
}
|
||||
.map_err(|e| {
|
||||
|
@ -18,6 +18,8 @@ use crate::{
|
||||
#[derive(Snafu, Debug)]
|
||||
#[snafu(visibility(pub(crate)))]
|
||||
pub enum RecorderError {
|
||||
#[snafu(transparent)]
|
||||
CronError { source: croner::errors::CronError },
|
||||
#[snafu(display(
|
||||
"HTTP {status} {reason}, source = {source:?}",
|
||||
status = status,
|
||||
@ -120,8 +122,13 @@ pub enum RecorderError {
|
||||
#[snafu(source(from(Box<dyn std::error::Error + Send + Sync>, OptDynErr::some)))]
|
||||
source: OptDynErr,
|
||||
},
|
||||
#[snafu(display("Model Entity {entity} not found or not belong to subscriber"))]
|
||||
ModelEntityNotFound { entity: Cow<'static, str> },
|
||||
#[snafu(display("Model Entity {entity} not found or not belong to subscriber{}", (
|
||||
detail.as_ref().map(|detail| format!(" : {detail}"))).unwrap_or_default()
|
||||
))]
|
||||
ModelEntityNotFound {
|
||||
entity: Cow<'static, str>,
|
||||
detail: Option<String>,
|
||||
},
|
||||
#[snafu(transparent)]
|
||||
FetchError { source: FetchError },
|
||||
#[snafu(display("Credential3rdError: {message}, source = {source}"))]
|
||||
@ -185,9 +192,20 @@ impl RecorderError {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_db_record_not_found<T: ToString>(detail: T) -> Self {
|
||||
Self::DbError {
|
||||
source: sea_orm::DbErr::RecordNotFound(detail.to_string()),
|
||||
pub fn from_model_not_found_detail<C: Into<Cow<'static, str>>, T: ToString>(
|
||||
model: C,
|
||||
detail: T,
|
||||
) -> Self {
|
||||
Self::ModelEntityNotFound {
|
||||
entity: model.into(),
|
||||
detail: Some(detail.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_model_not_found<C: Into<Cow<'static, str>>>(model: C) -> Self {
|
||||
Self::ModelEntityNotFound {
|
||||
entity: model.into(),
|
||||
detail: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -252,9 +270,9 @@ impl IntoResponse for RecorderError {
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
Self::ModelEntityNotFound { entity } => (
|
||||
merr @ Self::ModelEntityNotFound { .. } => (
|
||||
StatusCode::NOT_FOUND,
|
||||
Json::<StandardErrorResponse>(StandardErrorResponse::from(entity.to_string())),
|
||||
Json::<StandardErrorResponse>(StandardErrorResponse::from(merr.to_string())),
|
||||
)
|
||||
.into_response(),
|
||||
err => (
|
||||
|
@ -167,6 +167,7 @@ impl ForwardedRelatedInfo {
|
||||
.as_ref()
|
||||
.and_then(|s| s.host.as_deref())
|
||||
.or(self.x_forwarded_host.as_deref())
|
||||
.or(self.host.as_deref())
|
||||
.or(self.uri.host())
|
||||
}
|
||||
|
||||
|
@ -4,7 +4,7 @@ use fetch::{HttpClient, HttpClientTrait};
|
||||
use maplit::hashmap;
|
||||
use scraper::{Html, Selector};
|
||||
use sea_orm::{
|
||||
ActiveModelTrait, ActiveValue::Set, ColumnTrait, DbErr, EntityTrait, QueryFilter, TryIntoModel,
|
||||
ActiveModelTrait, ActiveValue::Set, ColumnTrait, EntityTrait, QueryFilter, TryIntoModel,
|
||||
};
|
||||
use url::Url;
|
||||
use util::OptDynErr;
|
||||
@ -227,8 +227,9 @@ impl MikanClient {
|
||||
self.fork_with_userpass_credential(userpass_credential)
|
||||
.await
|
||||
} else {
|
||||
Err(RecorderError::from_db_record_not_found(
|
||||
DbErr::RecordNotFound(format!("credential={credential_id} not found")),
|
||||
Err(RecorderError::from_model_not_found_detail(
|
||||
"credential",
|
||||
format!("credential id {credential_id} not found"),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
@ -801,11 +801,6 @@ pub async fn scrape_mikan_poster_meta_from_image_url(
|
||||
.write(storage_path.clone(), poster_data)
|
||||
.await?;
|
||||
|
||||
tracing::warn!(
|
||||
poster_str = poster_str.to_string(),
|
||||
"mikan poster meta extracted"
|
||||
);
|
||||
|
||||
MikanBangumiPosterMeta {
|
||||
origin_poster_src: origin_poster_src_url,
|
||||
poster_src: Some(poster_str.to_string()),
|
||||
|
@ -47,8 +47,27 @@ impl<'a> EpisodeComp<'a> {
|
||||
Ok((input, f32::round(num) as i32))
|
||||
}
|
||||
|
||||
fn parse_ep_special_num(input: &'a str) -> IResult<&'a str, i32> {
|
||||
terminated(
|
||||
alt((
|
||||
value(0, tag_no_case("ova")),
|
||||
value(0, tag_no_case("oad")),
|
||||
value(0, tag_no_case("sp")),
|
||||
value(0, tag_no_case("ex")),
|
||||
)),
|
||||
(space0, opt(parse_int::<i32>)),
|
||||
)
|
||||
.parse(input)
|
||||
}
|
||||
|
||||
fn parse_ep_num(input: &'a str) -> IResult<&'a str, i32> {
|
||||
alt((parse_int::<i32>, Self::parse_ep_round_num, ZhNum::parse_int)).parse(input)
|
||||
alt((
|
||||
parse_int::<i32>,
|
||||
Self::parse_ep_round_num,
|
||||
ZhNum::parse_int,
|
||||
Self::parse_ep_special_num,
|
||||
))
|
||||
.parse(input)
|
||||
}
|
||||
|
||||
fn parse_ep_nums_core(input: &'a str) -> IResult<&'a str, (i32, Option<i32>)> {
|
||||
@ -175,8 +194,13 @@ impl<'a> std::fmt::Debug for MoiveComp<'a> {
|
||||
impl<'a> OriginCompTrait<'a> for MoiveComp<'a> {
|
||||
#[cfg_attr(debug_assertions, instrument(level = Level::TRACE, ret, err(level=Level::TRACE), "MoiveComp::parse_comp"))]
|
||||
fn parse_comp(input: &'a str) -> IResult<&'a str, Self> {
|
||||
let (input, source) =
|
||||
alt((tag("剧场版"), tag("电影"), tag_no_case("movie"))).parse(input)?;
|
||||
let (input, source) = alt((
|
||||
tag("剧场版"),
|
||||
tag("电影"),
|
||||
tag_no_case("movie"),
|
||||
tag_no_case("film"),
|
||||
))
|
||||
.parse(input)?;
|
||||
Ok((
|
||||
input,
|
||||
Self {
|
||||
|
@ -93,9 +93,7 @@ pub fn register_subscriber_tasks_entity_mutations(
|
||||
.into_tuple::<String>()
|
||||
.one(db)
|
||||
.await?
|
||||
.ok_or_else(|| RecorderError::ModelEntityNotFound {
|
||||
entity: "SubscriberTask".into(),
|
||||
})?;
|
||||
.ok_or_else(|| RecorderError::from_model_not_found("SubscriberTask"))?;
|
||||
|
||||
let task = app_ctx.task();
|
||||
task.retry_subscriber_task(job_id.clone()).await?;
|
||||
@ -104,9 +102,7 @@ pub fn register_subscriber_tasks_entity_mutations(
|
||||
.filter(subscriber_tasks::Column::Id.eq(&job_id))
|
||||
.one(db)
|
||||
.await?
|
||||
.ok_or_else(|| RecorderError::ModelEntityNotFound {
|
||||
entity: "SubscriberTask".into(),
|
||||
})?;
|
||||
.ok_or_else(|| RecorderError::from_model_not_found("SubscriberTask"))?;
|
||||
|
||||
Ok::<_, RecorderError>(Some(FieldValue::owned_any(task_model)))
|
||||
})
|
||||
|
@ -63,9 +63,7 @@ pub fn register_subscriptions_to_schema_builder(
|
||||
.filter(filters_condition)
|
||||
.one(db)
|
||||
.await?
|
||||
.ok_or_else(|| RecorderError::ModelEntityNotFound {
|
||||
entity: "Subscription".into(),
|
||||
})?;
|
||||
.ok_or_else(|| RecorderError::from_model_not_found("Subscription"))?;
|
||||
|
||||
let subscription =
|
||||
subscriptions::Subscription::try_from_model(&subscription_model)?;
|
||||
@ -85,9 +83,7 @@ pub fn register_subscriptions_to_schema_builder(
|
||||
.filter(subscriber_tasks::Column::Id.eq(task_id.to_string()))
|
||||
.one(db)
|
||||
.await?
|
||||
.ok_or_else(|| RecorderError::ModelEntityNotFound {
|
||||
entity: "SubscriberTask".into(),
|
||||
})?;
|
||||
.ok_or_else(|| RecorderError::from_model_not_found("SubscriberTask"))?;
|
||||
|
||||
Ok(Some(FieldValue::owned_any(task_model)))
|
||||
})
|
||||
@ -121,9 +117,7 @@ pub fn register_subscriptions_to_schema_builder(
|
||||
.filter(filters_condition)
|
||||
.one(db)
|
||||
.await?
|
||||
.ok_or_else(|| RecorderError::ModelEntityNotFound {
|
||||
entity: "Subscription".into(),
|
||||
})?;
|
||||
.ok_or_else(|| RecorderError::from_model_not_found("Subscription"))?;
|
||||
|
||||
let subscription =
|
||||
subscriptions::Subscription::try_from_model(&subscription_model)?;
|
||||
@ -141,9 +135,7 @@ pub fn register_subscriptions_to_schema_builder(
|
||||
.filter(subscriber_tasks::Column::Id.eq(task_id.to_string()))
|
||||
.one(db)
|
||||
.await?
|
||||
.ok_or_else(|| RecorderError::ModelEntityNotFound {
|
||||
entity: "SubscriberTask".into(),
|
||||
})?;
|
||||
.ok_or_else(|| RecorderError::from_model_not_found("SubscriberTask"))?;
|
||||
|
||||
Ok(Some(FieldValue::owned_any(task_model)))
|
||||
})
|
||||
@ -178,9 +170,7 @@ pub fn register_subscriptions_to_schema_builder(
|
||||
.filter(filters_condition)
|
||||
.one(db)
|
||||
.await?
|
||||
.ok_or_else(|| RecorderError::ModelEntityNotFound {
|
||||
entity: "Subscription".into(),
|
||||
})?;
|
||||
.ok_or_else(|| RecorderError::from_model_not_found("Subscription"))?;
|
||||
|
||||
let subscription =
|
||||
subscriptions::Subscription::try_from_model(&subscription_model)?;
|
||||
@ -198,9 +188,7 @@ pub fn register_subscriptions_to_schema_builder(
|
||||
.filter(subscriber_tasks::Column::Id.eq(task_id.to_string()))
|
||||
.one(db)
|
||||
.await?
|
||||
.ok_or_else(|| RecorderError::ModelEntityNotFound {
|
||||
entity: "SubscriberTask".into(),
|
||||
})?;
|
||||
.ok_or_else(|| RecorderError::from_model_not_found("SubscriberTask"))?;
|
||||
|
||||
Ok(Some(FieldValue::owned_any(task_model)))
|
||||
})
|
||||
|
@ -7,7 +7,8 @@
|
||||
async_fn_traits,
|
||||
error_generic_member_access,
|
||||
associated_type_defaults,
|
||||
let_chains
|
||||
let_chains,
|
||||
impl_trait_in_fn_trait_return
|
||||
)]
|
||||
#![allow(clippy::enum_variant_names)]
|
||||
pub use downloader;
|
||||
|
@ -171,6 +171,27 @@ pub enum Feeds {
|
||||
SubscriptionId,
|
||||
}
|
||||
|
||||
#[derive(DeriveIden)]
|
||||
pub enum Cron {
|
||||
Table,
|
||||
Id,
|
||||
CronSource,
|
||||
SubscriberId,
|
||||
SubscriptionId,
|
||||
CronExpr,
|
||||
NextRun,
|
||||
LastRun,
|
||||
LastError,
|
||||
Enabled,
|
||||
LockedBy,
|
||||
LockedAt,
|
||||
TimeoutMs,
|
||||
Attempts,
|
||||
MaxAttempts,
|
||||
Priority,
|
||||
Status,
|
||||
}
|
||||
|
||||
macro_rules! create_postgres_enum_for_active_enum {
|
||||
($manager: expr, $active_enum: expr, $($enum_value:expr),+) => {
|
||||
{
|
||||
|
@ -0,0 +1,62 @@
|
||||
use async_trait::async_trait;
|
||||
use sea_orm_migration::prelude::*;
|
||||
|
||||
use crate::task::SUBSCRIBER_TASK_APALIS_NAME;
|
||||
|
||||
#[derive(DeriveMigrationName)]
|
||||
pub struct Migration;
|
||||
|
||||
#[async_trait]
|
||||
impl MigrationTrait for Migration {
|
||||
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
let db = manager.get_connection();
|
||||
|
||||
db.execute_unprepared(&format!(
|
||||
r#"CREATE OR REPLACE VIEW subscriber_tasks AS
|
||||
SELECT
|
||||
job,
|
||||
job_type,
|
||||
status,
|
||||
(job ->> 'subscriber_id'::text)::integer AS subscriber_id,
|
||||
job ->> 'task_type'::text AS task_type,
|
||||
id,
|
||||
attempts,
|
||||
max_attempts,
|
||||
run_at,
|
||||
last_error,
|
||||
lock_at,
|
||||
lock_by,
|
||||
done_at,
|
||||
priority,
|
||||
(job ->> 'subscription_id'::text)::integer AS subscription_id
|
||||
FROM apalis.jobs
|
||||
WHERE job_type = '{SUBSCRIBER_TASK_APALIS_NAME}'
|
||||
AND jsonb_path_exists(job, '$.subscriber_id ? (@.type() == "number")')
|
||||
AND jsonb_path_exists(job, '$.task_type ? (@.type() == "string")')"#,
|
||||
))
|
||||
.await?;
|
||||
|
||||
db.execute_unprepared(&format!(
|
||||
r#"CREATE INDEX IF NOT EXISTS idx_apalis_jobs_subscription_id
|
||||
ON apalis.jobs (((job -> 'subscription_id')::integer))
|
||||
WHERE job_type = '{SUBSCRIBER_TASK_APALIS_NAME}'
|
||||
AND jsonb_path_exists(job, '$.subscription_id ? (@.type() == "number")')
|
||||
AND jsonb_path_exists(job, '$.task_type ? (@.type() == "string")')"#
|
||||
))
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
let db = manager.get_connection();
|
||||
|
||||
db.execute_unprepared(
|
||||
r#"DROP INDEX IF EXISTS idx_apalis_jobs_subscription_id
|
||||
ON apalis.jobs"#,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
244
apps/recorder/src/migrations/m20250629_065628_add_cron.rs
Normal file
244
apps/recorder/src/migrations/m20250629_065628_add_cron.rs
Normal file
@ -0,0 +1,244 @@
|
||||
use async_trait::async_trait;
|
||||
use sea_orm::ActiveEnum;
|
||||
use sea_orm_migration::{prelude::*, schema::*};
|
||||
|
||||
use crate::{
|
||||
migrations::defs::{
|
||||
Cron, CustomSchemaManagerExt, GeneralIds, Subscribers, Subscriptions, table_auto_z,
|
||||
},
|
||||
models::cron::{
|
||||
CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, CRON_DUE_EVENT, CronSource, CronSourceEnum,
|
||||
CronStatus, CronStatusEnum, NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME,
|
||||
NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME,
|
||||
},
|
||||
};
|
||||
|
||||
#[derive(DeriveMigrationName)]
|
||||
pub struct Migration;
|
||||
|
||||
#[async_trait]
|
||||
impl MigrationTrait for Migration {
|
||||
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
create_postgres_enum_for_active_enum!(manager, CronSourceEnum, CronSource::Subscription)
|
||||
.await?;
|
||||
|
||||
create_postgres_enum_for_active_enum!(
|
||||
manager,
|
||||
CronStatusEnum,
|
||||
CronStatus::Pending,
|
||||
CronStatus::Running,
|
||||
CronStatus::Completed,
|
||||
CronStatus::Failed
|
||||
)
|
||||
.await?;
|
||||
|
||||
manager
|
||||
.create_table(
|
||||
table_auto_z(Cron::Table)
|
||||
.col(pk_auto(Cron::Id))
|
||||
.col(string(Cron::CronExpr))
|
||||
.col(enumeration(
|
||||
Cron::CronSource,
|
||||
CronSourceEnum,
|
||||
CronSource::iden_values(),
|
||||
))
|
||||
.col(integer_null(Cron::SubscriberId))
|
||||
.col(integer_null(Cron::SubscriptionId))
|
||||
.col(timestamp_with_time_zone_null(Cron::NextRun))
|
||||
.col(timestamp_with_time_zone_null(Cron::LastRun))
|
||||
.col(string_null(Cron::LastError))
|
||||
.col(boolean(Cron::Enabled).default(true))
|
||||
.col(string_null(Cron::LockedBy))
|
||||
.col(timestamp_with_time_zone_null(Cron::LockedAt))
|
||||
.col(integer_null(Cron::TimeoutMs))
|
||||
.col(integer(Cron::Attempts))
|
||||
.col(integer(Cron::MaxAttempts))
|
||||
.col(integer(Cron::Priority))
|
||||
.col(enumeration(
|
||||
Cron::Status,
|
||||
CronStatusEnum,
|
||||
CronStatus::iden_values(),
|
||||
))
|
||||
.foreign_key(
|
||||
ForeignKey::create()
|
||||
.name("fk_cron_subscriber_id")
|
||||
.from(Cron::Table, Cron::SubscriberId)
|
||||
.to(Subscribers::Table, Subscribers::Id)
|
||||
.on_delete(ForeignKeyAction::Cascade)
|
||||
.on_update(ForeignKeyAction::Cascade),
|
||||
)
|
||||
.foreign_key(
|
||||
ForeignKey::create()
|
||||
.name("fk_cron_subscription_id")
|
||||
.from(Cron::Table, Cron::SubscriptionId)
|
||||
.to(Subscriptions::Table, Subscriptions::Id)
|
||||
.on_delete(ForeignKeyAction::Cascade)
|
||||
.on_update(ForeignKeyAction::Cascade),
|
||||
)
|
||||
.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
manager
|
||||
.create_postgres_auto_update_ts_trigger_for_col(Cron::Table, GeneralIds::UpdatedAt)
|
||||
.await?;
|
||||
|
||||
manager
|
||||
.create_index(
|
||||
IndexCreateStatement::new()
|
||||
.if_not_exists()
|
||||
.name("idx_cron_cron_source")
|
||||
.table(Cron::Table)
|
||||
.col(Cron::CronSource)
|
||||
.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
manager
|
||||
.create_index(
|
||||
IndexCreateStatement::new()
|
||||
.if_not_exists()
|
||||
.name("idx_cron_next_run")
|
||||
.table(Cron::Table)
|
||||
.col(Cron::NextRun)
|
||||
.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let db = manager.get_connection();
|
||||
|
||||
db.execute_unprepared(&format!(
|
||||
r#"CREATE OR REPLACE FUNCTION {NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME}() RETURNS trigger AS $$
|
||||
BEGIN
|
||||
-- Check if the cron is due to run
|
||||
IF NEW.{next_run} IS NOT NULL
|
||||
AND NEW.{next_run} <= CURRENT_TIMESTAMP
|
||||
AND NEW.{enabled} = true
|
||||
AND NEW.{status} = '{pending}'
|
||||
AND NEW.{attempts} < NEW.{max_attempts}
|
||||
-- Check if not locked or lock timeout
|
||||
AND (
|
||||
NEW.{locked_at} IS NULL
|
||||
OR (
|
||||
NEW.{timeout_ms} IS NOT NULL
|
||||
AND (NEW.{locked_at} + NEW.{timeout_ms} * INTERVAL '1 millisecond') <= CURRENT_TIMESTAMP
|
||||
)
|
||||
)
|
||||
-- Make sure the cron is a new due event, not a repeat event
|
||||
AND (
|
||||
OLD.{next_run} IS NULL
|
||||
OR OLD.{next_run} > CURRENT_TIMESTAMP
|
||||
OR OLD.{enabled} = false
|
||||
OR OLD.{status} != '{pending}'
|
||||
OR OLD.{attempts} != NEW.{attempts}
|
||||
)
|
||||
THEN
|
||||
PERFORM pg_notify('{CRON_DUE_EVENT}', row_to_json(NEW)::text);
|
||||
END IF;
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;"#,
|
||||
next_run = &Cron::NextRun.to_string(),
|
||||
enabled = &Cron::Enabled.to_string(),
|
||||
locked_at = &Cron::LockedAt.to_string(),
|
||||
timeout_ms = &Cron::TimeoutMs.to_string(),
|
||||
status = &Cron::Status.to_string(),
|
||||
pending = &CronStatus::Pending.to_value(),
|
||||
attempts = &Cron::Attempts.to_string(),
|
||||
max_attempts = &Cron::MaxAttempts.to_string(),
|
||||
))
|
||||
.await?;
|
||||
|
||||
db.execute_unprepared(&format!(
|
||||
r#"CREATE OR REPLACE TRIGGER {NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME}
|
||||
AFTER INSERT OR UPDATE ON {table}
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION {NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME}();"#,
|
||||
table = &Cron::Table.to_string(),
|
||||
))
|
||||
.await?;
|
||||
|
||||
db.execute_unprepared(&format!(
|
||||
r#"CREATE OR REPLACE FUNCTION {CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME}() RETURNS INTEGER AS $$
|
||||
DECLARE
|
||||
cron_record RECORD;
|
||||
notification_count INTEGER := 0;
|
||||
BEGIN
|
||||
FOR cron_record IN
|
||||
SELECT * FROM {table}
|
||||
WHERE {next_run} IS NOT NULL
|
||||
AND {next_run} <= CURRENT_TIMESTAMP
|
||||
AND {enabled} = true
|
||||
AND {status} = '{pending}'
|
||||
AND {attempts} < {max_attempts}
|
||||
AND (
|
||||
{locked_at} IS NULL
|
||||
OR (
|
||||
{timeout_ms} IS NOT NULL
|
||||
AND {locked_at} + {timeout_ms} * INTERVAL '1 millisecond' <= CURRENT_TIMESTAMP
|
||||
)
|
||||
)
|
||||
ORDER BY {priority} ASC, {next_run} ASC
|
||||
FOR UPDATE SKIP LOCKED
|
||||
LOOP
|
||||
PERFORM pg_notify('{CRON_DUE_EVENT}', row_to_json(cron_record)::text);
|
||||
notification_count := notification_count + 1;
|
||||
END LOOP;
|
||||
RETURN notification_count;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;"#,
|
||||
table = &Cron::Table.to_string(),
|
||||
next_run = &Cron::NextRun.to_string(),
|
||||
enabled = &Cron::Enabled.to_string(),
|
||||
status = &Cron::Status.to_string(),
|
||||
pending = &CronStatus::Pending.to_value(),
|
||||
locked_at = &Cron::LockedAt.to_string(),
|
||||
timeout_ms = &Cron::TimeoutMs.to_string(),
|
||||
priority = &Cron::Priority.to_string(),
|
||||
attempts = &Cron::Attempts.to_string(),
|
||||
max_attempts = &Cron::MaxAttempts.to_string(),
|
||||
))
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
let db = manager.get_connection();
|
||||
|
||||
db.execute_unprepared(&format!(
|
||||
r#"DROP TRIGGER IF EXISTS {NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME} ON {table};"#,
|
||||
table = &Cron::Table.to_string(),
|
||||
))
|
||||
.await?;
|
||||
|
||||
db.execute_unprepared(&format!(
|
||||
r#"DROP FUNCTION IF EXISTS {NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME}();"#,
|
||||
))
|
||||
.await?;
|
||||
|
||||
db.execute_unprepared(&format!(
|
||||
r#"DROP FUNCTION IF EXISTS {CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME}();"#,
|
||||
))
|
||||
.await?;
|
||||
|
||||
manager
|
||||
.drop_table(
|
||||
TableDropStatement::new()
|
||||
.if_exists()
|
||||
.table(Cron::Table)
|
||||
.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
manager
|
||||
.drop_postgres_enum_for_active_enum(CronSourceEnum)
|
||||
.await?;
|
||||
|
||||
manager
|
||||
.drop_postgres_enum_for_active_enum(CronStatusEnum)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -10,6 +10,8 @@ pub mod m20250501_021523_credential_3rd;
|
||||
pub mod m20250520_021135_subscriber_tasks;
|
||||
pub mod m20250622_015618_feeds;
|
||||
pub mod m20250622_020819_bangumi_and_episode_type;
|
||||
pub mod m20250625_060701_add_subscription_id_to_subscriber_tasks;
|
||||
pub mod m20250629_065628_add_cron;
|
||||
|
||||
pub struct Migrator;
|
||||
|
||||
@ -24,6 +26,8 @@ impl MigratorTrait for Migrator {
|
||||
Box::new(m20250520_021135_subscriber_tasks::Migration),
|
||||
Box::new(m20250622_015618_feeds::Migration),
|
||||
Box::new(m20250622_020819_bangumi_and_episode_type::Migration),
|
||||
Box::new(m20250625_060701_add_subscription_id_to_subscriber_tasks::Migration),
|
||||
Box::new(m20250629_065628_add_cron::Migration),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
@ -63,7 +63,9 @@ impl Model {
|
||||
.filter(Column::Pid.eq(pid))
|
||||
.one(db)
|
||||
.await?
|
||||
.ok_or_else(|| RecorderError::from_db_record_not_found("auth::find_by_pid"))?;
|
||||
.ok_or_else(|| {
|
||||
RecorderError::from_model_not_found_detail("auth", format!("pid {pid} not found"))
|
||||
})?;
|
||||
Ok(subscriber_auth)
|
||||
}
|
||||
|
||||
|
7
apps/recorder/src/models/cron/core.rs
Normal file
7
apps/recorder/src/models/cron/core.rs
Normal file
@ -0,0 +1,7 @@
|
||||
pub const CRON_DUE_EVENT: &str = "cron_due";
|
||||
|
||||
pub const CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME: &str = "check_and_trigger_due_crons";
|
||||
|
||||
pub const NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME: &str = "notify_due_cron_when_mutating";
|
||||
pub const NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME: &str =
|
||||
"notify_due_cron_when_mutating_trigger";
|
291
apps/recorder/src/models/cron/mod.rs
Normal file
291
apps/recorder/src/models/cron/mod.rs
Normal file
@ -0,0 +1,291 @@
|
||||
mod core;
|
||||
mod registry;
|
||||
|
||||
pub use core::{
|
||||
CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, CRON_DUE_EVENT,
|
||||
NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME, NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use croner::Cron;
|
||||
use sea_orm::{
|
||||
ActiveValue::Set, DeriveActiveEnum, DeriveDisplay, DeriveEntityModel, EnumIter, QuerySelect,
|
||||
Statement, TransactionTrait, entity::prelude::*, sea_query::LockType,
|
||||
sqlx::postgres::PgNotification,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{
|
||||
app::AppContextTrait,
|
||||
errors::{RecorderError, RecorderResult},
|
||||
models::subscriptions::{self},
|
||||
};
|
||||
|
||||
#[derive(
|
||||
Debug, Clone, PartialEq, Eq, DeriveActiveEnum, EnumIter, DeriveDisplay, Serialize, Deserialize,
|
||||
)]
|
||||
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "cron_source")]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CronSource {
|
||||
#[sea_orm(string_value = "subscription")]
|
||||
Subscription,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Debug, Clone, PartialEq, Eq, DeriveActiveEnum, EnumIter, DeriveDisplay, Serialize, Deserialize,
|
||||
)]
|
||||
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "cron_status")]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CronStatus {
|
||||
#[sea_orm(string_value = "pending")]
|
||||
Pending,
|
||||
#[sea_orm(string_value = "running")]
|
||||
Running,
|
||||
#[sea_orm(string_value = "completed")]
|
||||
Completed,
|
||||
#[sea_orm(string_value = "failed")]
|
||||
Failed,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
|
||||
#[sea_orm(table_name = "cron")]
|
||||
pub struct Model {
|
||||
#[sea_orm(default_expr = "Expr::current_timestamp()")]
|
||||
pub created_at: DateTimeUtc,
|
||||
#[sea_orm(default_expr = "Expr::current_timestamp()")]
|
||||
pub updated_at: DateTimeUtc,
|
||||
#[sea_orm(primary_key)]
|
||||
pub id: i32,
|
||||
pub cron_source: CronSource,
|
||||
pub subscriber_id: Option<i32>,
|
||||
pub subscription_id: Option<i32>,
|
||||
pub cron_expr: String,
|
||||
pub next_run: Option<DateTimeUtc>,
|
||||
pub last_run: Option<DateTimeUtc>,
|
||||
pub last_error: Option<String>,
|
||||
pub locked_by: Option<String>,
|
||||
pub locked_at: Option<DateTimeUtc>,
|
||||
pub timeout_ms: i32,
|
||||
#[sea_orm(default_expr = "0")]
|
||||
pub attempts: i32,
|
||||
#[sea_orm(default_expr = "1")]
|
||||
pub max_attempts: i32,
|
||||
#[sea_orm(default_expr = "0")]
|
||||
pub priority: i32,
|
||||
pub status: CronStatus,
|
||||
#[sea_orm(default_expr = "true")]
|
||||
pub enabled: bool,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||
pub enum Relation {
|
||||
#[sea_orm(
|
||||
belongs_to = "super::subscribers::Entity",
|
||||
from = "Column::SubscriberId",
|
||||
to = "super::subscribers::Column::Id",
|
||||
on_update = "Cascade",
|
||||
on_delete = "Cascade"
|
||||
)]
|
||||
Subscriber,
|
||||
#[sea_orm(
|
||||
belongs_to = "super::subscriptions::Entity",
|
||||
from = "Column::SubscriptionId",
|
||||
to = "super::subscriptions::Column::Id",
|
||||
on_update = "Cascade",
|
||||
on_delete = "Cascade"
|
||||
)]
|
||||
Subscription,
|
||||
}
|
||||
|
||||
impl Related<super::subscribers::Entity> for Entity {
|
||||
fn to() -> RelationDef {
|
||||
Relation::Subscriber.def()
|
||||
}
|
||||
}
|
||||
|
||||
impl Related<super::subscriptions::Entity> for Entity {
|
||||
fn to() -> RelationDef {
|
||||
Relation::Subscription.def()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)]
|
||||
pub enum RelatedEntity {
|
||||
#[sea_orm(entity = "super::subscribers::Entity")]
|
||||
Subscriber,
|
||||
#[sea_orm(entity = "super::subscriptions::Entity")]
|
||||
Subscription,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ActiveModelBehavior for ActiveModel {}
|
||||
|
||||
impl Model {
|
||||
pub async fn handle_cron_notification(
|
||||
ctx: &dyn AppContextTrait,
|
||||
notification: PgNotification,
|
||||
worker_id: &str,
|
||||
) -> RecorderResult<()> {
|
||||
let payload: Self = serde_json::from_str(notification.payload())?;
|
||||
let cron_id = payload.id;
|
||||
|
||||
tracing::debug!("Cron notification received for cron {cron_id} and worker {worker_id}");
|
||||
|
||||
match Self::try_acquire_lock_with_cron_id(ctx, cron_id, worker_id).await? {
|
||||
Some(cron) => match cron.exec_cron(ctx).await {
|
||||
Ok(()) => {
|
||||
tracing::debug!("Cron {cron_id} executed successfully");
|
||||
cron.mark_cron_completed(ctx).await?;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Error executing cron {cron_id}: {e}");
|
||||
cron.mark_cron_failed(ctx, &e.to_string()).await?;
|
||||
}
|
||||
},
|
||||
None => {
|
||||
tracing::debug!(
|
||||
"Cron lock not acquired for cron {cron_id} and worker {worker_id}, skipping..."
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn try_acquire_lock_with_cron_id(
|
||||
ctx: &dyn AppContextTrait,
|
||||
cron_id: i32,
|
||||
worker_id: &str,
|
||||
) -> RecorderResult<Option<Self>> {
|
||||
let db = ctx.db();
|
||||
let txn = db.begin().await?;
|
||||
|
||||
let cron = Entity::find_by_id(cron_id)
|
||||
.lock(LockType::Update)
|
||||
.one(&txn)
|
||||
.await?;
|
||||
|
||||
if let Some(cron) = cron {
|
||||
if cron.enabled
|
||||
&& cron.attempts < cron.max_attempts
|
||||
&& cron.status == CronStatus::Pending
|
||||
&& (cron.locked_at.is_none_or(|locked_at| {
|
||||
locked_at + chrono::Duration::milliseconds(cron.timeout_ms as i64) <= Utc::now()
|
||||
}))
|
||||
&& cron.next_run.is_some_and(|next_run| next_run <= Utc::now())
|
||||
{
|
||||
let cron_active_model = ActiveModel {
|
||||
id: Set(cron.id),
|
||||
locked_by: Set(Some(worker_id.to_string())),
|
||||
locked_at: Set(Some(Utc::now())),
|
||||
status: Set(CronStatus::Running),
|
||||
attempts: Set(cron.attempts + 1),
|
||||
..Default::default()
|
||||
};
|
||||
let cron_model = cron_active_model.update(&txn).await?;
|
||||
txn.commit().await?;
|
||||
return Ok(Some(cron_model));
|
||||
}
|
||||
txn.commit().await?;
|
||||
return Ok(Some(cron));
|
||||
}
|
||||
txn.rollback().await?;
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
async fn exec_cron(&self, ctx: &dyn AppContextTrait) -> RecorderResult<()> {
|
||||
match self.cron_source {
|
||||
CronSource::Subscription => {
|
||||
let subscription_id = self.subscription_id.unwrap_or_else(|| {
|
||||
unreachable!("Subscription cron must have a subscription id")
|
||||
});
|
||||
|
||||
let subscription = subscriptions::Entity::find_by_id(subscription_id)
|
||||
.one(ctx.db())
|
||||
.await?
|
||||
.ok_or_else(|| RecorderError::from_model_not_found("Subscription"))?;
|
||||
|
||||
subscription.exec_cron(ctx).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn mark_cron_completed(&self, ctx: &dyn AppContextTrait) -> RecorderResult<()> {
|
||||
let db = ctx.db();
|
||||
|
||||
let next_run = self.calculate_next_run(&self.cron_expr)?;
|
||||
|
||||
ActiveModel {
|
||||
id: Set(self.id),
|
||||
next_run: Set(Some(next_run)),
|
||||
last_run: Set(Some(Utc::now())),
|
||||
status: Set(CronStatus::Pending),
|
||||
locked_by: Set(None),
|
||||
locked_at: Set(None),
|
||||
attempts: Set(0),
|
||||
last_error: Set(None),
|
||||
..Default::default()
|
||||
}
|
||||
.update(db)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn mark_cron_failed(&self, ctx: &dyn AppContextTrait, error: &str) -> RecorderResult<()> {
|
||||
let db = ctx.db();
|
||||
|
||||
let should_retry = self.attempts < self.max_attempts;
|
||||
|
||||
let status = if should_retry {
|
||||
CronStatus::Pending
|
||||
} else {
|
||||
CronStatus::Failed
|
||||
};
|
||||
|
||||
let next_run = if should_retry {
|
||||
Some(Utc::now() + chrono::Duration::seconds(5))
|
||||
} else {
|
||||
Some(self.calculate_next_run(&self.cron_expr)?)
|
||||
};
|
||||
|
||||
ActiveModel {
|
||||
id: Set(self.id),
|
||||
next_run: Set(next_run),
|
||||
status: Set(status),
|
||||
locked_by: Set(None),
|
||||
locked_at: Set(None),
|
||||
last_run: Set(Some(Utc::now())),
|
||||
last_error: Set(Some(error.to_string())),
|
||||
attempts: Set(if should_retry { self.attempts + 1 } else { 0 }),
|
||||
..Default::default()
|
||||
}
|
||||
.update(db)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn check_and_trigger_due_crons(ctx: &dyn AppContextTrait) -> RecorderResult<()> {
|
||||
let db = ctx.db();
|
||||
|
||||
db.execute(Statement::from_string(
|
||||
db.get_database_backend(),
|
||||
format!("SELECT {CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME}()"),
|
||||
))
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn calculate_next_run(&self, cron_expr: &str) -> RecorderResult<DateTime<Utc>> {
|
||||
let cron_expr = Cron::new(cron_expr).parse()?;
|
||||
|
||||
let next = cron_expr.find_next_occurrence(&Utc::now(), false)?;
|
||||
|
||||
Ok(next)
|
||||
}
|
||||
}
|
1
apps/recorder/src/models/cron/registry.rs
Normal file
1
apps/recorder/src/models/cron/registry.rs
Normal file
@ -0,0 +1 @@
|
||||
|
@ -129,7 +129,7 @@ pub enum RelatedEntity {
|
||||
}
|
||||
|
||||
impl ActiveModel {
|
||||
#[tracing::instrument(err, skip(ctx), fields(bangumi_id = ?bangumi.id, mikan_episode_id = ?episode.mikan_episode_id))]
|
||||
#[tracing::instrument(err, skip_all, fields(bangumi_id = ?bangumi.id, mikan_episode_id = ?episode.mikan_episode_id))]
|
||||
pub fn from_mikan_bangumi_and_episode_meta(
|
||||
ctx: &dyn AppContextTrait,
|
||||
bangumi: &bangumi::Model,
|
||||
|
@ -122,9 +122,7 @@ impl Model {
|
||||
.filter(Column::FeedType.eq(FeedType::Rss))
|
||||
.one(db)
|
||||
.await?
|
||||
.ok_or(RecorderError::ModelEntityNotFound {
|
||||
entity: "Feed".into(),
|
||||
})?;
|
||||
.ok_or(RecorderError::from_model_not_found("Feed"))?;
|
||||
|
||||
let feed = Feed::from_model(ctx, feed_model).await?;
|
||||
|
||||
|
@ -1,5 +1,7 @@
|
||||
use rss::Channel;
|
||||
use sea_orm::{ColumnTrait, EntityTrait, JoinType, QueryFilter, QuerySelect, RelationTrait};
|
||||
use sea_orm::{
|
||||
ColumnTrait, EntityTrait, JoinType, Order, QueryFilter, QueryOrder, QuerySelect, RelationTrait,
|
||||
};
|
||||
use url::Url;
|
||||
|
||||
use crate::{
|
||||
@ -37,13 +39,12 @@ impl Feed {
|
||||
subscription_episode::Relation::Subscription.def(),
|
||||
)
|
||||
.filter(subscriptions::Column::Id.eq(subscription_id))
|
||||
.order_by(episodes::Column::EnclosurePubDate, Order::Desc)
|
||||
.all(db)
|
||||
.await?;
|
||||
(subscription, episodes)
|
||||
} else {
|
||||
return Err(RecorderError::ModelEntityNotFound {
|
||||
entity: "Subscription".into(),
|
||||
});
|
||||
return Err(RecorderError::from_model_not_found("Subscription"));
|
||||
};
|
||||
|
||||
Ok(Feed::SubscritpionEpisodes(
|
||||
|
@ -11,3 +11,4 @@ pub mod subscribers;
|
||||
pub mod subscription_bangumi;
|
||||
pub mod subscription_episode;
|
||||
pub mod subscriptions;
|
||||
pub mod cron;
|
||||
|
@ -29,6 +29,7 @@ pub struct Model {
|
||||
#[sea_orm(primary_key)]
|
||||
pub id: String,
|
||||
pub subscriber_id: i32,
|
||||
pub subscription_id: Option<i32>,
|
||||
pub job: SubscriberTask,
|
||||
pub task_type: SubscriberTaskType,
|
||||
pub status: SubscriberTaskStatus,
|
||||
@ -52,6 +53,14 @@ pub enum Relation {
|
||||
on_delete = "Cascade"
|
||||
)]
|
||||
Subscriber,
|
||||
#[sea_orm(
|
||||
belongs_to = "super::subscriptions::Entity",
|
||||
from = "Column::SubscriptionId",
|
||||
to = "super::subscriptions::Column::Id",
|
||||
on_update = "NoAction",
|
||||
on_delete = "NoAction"
|
||||
)]
|
||||
Subscription,
|
||||
}
|
||||
|
||||
impl Related<super::subscribers::Entity> for Entity {
|
||||
@ -60,10 +69,18 @@ impl Related<super::subscribers::Entity> for Entity {
|
||||
}
|
||||
}
|
||||
|
||||
impl Related<super::subscriptions::Entity> for Entity {
|
||||
fn to() -> RelationDef {
|
||||
Relation::Subscription.def()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)]
|
||||
pub enum RelatedEntity {
|
||||
#[sea_orm(entity = "super::subscribers::Entity")]
|
||||
Subscriber,
|
||||
#[sea_orm(entity = "super::subscriptions::Entity")]
|
||||
Subscription,
|
||||
}
|
||||
|
||||
#[async_trait]
|
@ -130,10 +130,9 @@ impl Model {
|
||||
pub async fn find_by_id(ctx: &dyn AppContextTrait, id: i32) -> RecorderResult<Self> {
|
||||
let db = ctx.db();
|
||||
|
||||
let subscriber = Entity::find_by_id(id)
|
||||
.one(db)
|
||||
.await?
|
||||
.ok_or_else(|| RecorderError::from_db_record_not_found("subscribers::find_by_id"))?;
|
||||
let subscriber = Entity::find_by_id(id).one(db).await?.ok_or_else(|| {
|
||||
RecorderError::from_model_not_found_detail("subscribers", format!("id {id} not found"))
|
||||
})?;
|
||||
Ok(subscriber)
|
||||
}
|
||||
|
||||
|
@ -61,6 +61,8 @@ pub enum Relation {
|
||||
Credential3rd,
|
||||
#[sea_orm(has_many = "super::feeds::Entity")]
|
||||
Feed,
|
||||
#[sea_orm(has_many = "super::subscriber_tasks::Entity")]
|
||||
SubscriberTask,
|
||||
}
|
||||
|
||||
impl Related<super::subscribers::Entity> for Entity {
|
||||
@ -121,6 +123,12 @@ impl Related<super::credential_3rd::Entity> for Entity {
|
||||
}
|
||||
}
|
||||
|
||||
impl Related<super::subscriber_tasks::Entity> for Entity {
|
||||
fn to() -> RelationDef {
|
||||
Relation::SubscriberTask.def()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)]
|
||||
pub enum RelatedEntity {
|
||||
#[sea_orm(entity = "super::subscribers::Entity")]
|
||||
@ -137,6 +145,8 @@ pub enum RelatedEntity {
|
||||
Credential3rd,
|
||||
#[sea_orm(entity = "super::feeds::Entity")]
|
||||
Feed,
|
||||
#[sea_orm(entity = "super::subscriber_tasks::Entity")]
|
||||
SubscriberTask,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@ -180,16 +190,16 @@ impl Model {
|
||||
let subscription_model = Entity::find_by_id(subscription_id)
|
||||
.one(db)
|
||||
.await?
|
||||
.ok_or_else(|| RecorderError::ModelEntityNotFound {
|
||||
entity: "Subscription".into(),
|
||||
})?;
|
||||
.ok_or_else(|| RecorderError::from_model_not_found("Subscription"))?;
|
||||
|
||||
if subscription_model.subscriber_id != subscriber_id {
|
||||
Err(RecorderError::ModelEntityNotFound {
|
||||
entity: "Subscription".into(),
|
||||
})?;
|
||||
Err(RecorderError::from_model_not_found("Subscription"))?;
|
||||
}
|
||||
|
||||
Ok(subscription_model)
|
||||
}
|
||||
|
||||
pub async fn exec_cron(&self, _ctx: &dyn AppContextTrait) -> RecorderResult<()> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
@ -209,7 +209,7 @@ impl StorageService {
|
||||
lister.try_collect().await
|
||||
}
|
||||
|
||||
#[instrument(skip_all, err, fields(storage_path = %storage_path.as_ref(), range = ?range, accept = ?accept))]
|
||||
#[instrument(skip_all, err, fields(storage_path = %storage_path.as_ref(), range = ?range, accept = accept.to_string()))]
|
||||
pub async fn serve_optimized_image(
|
||||
&self,
|
||||
storage_path: impl AsRef<Path>,
|
||||
|
@ -1,134 +1,18 @@
|
||||
mod media;
|
||||
mod subscriber;
|
||||
mod subscription;
|
||||
use std::sync::Arc;
|
||||
mod system;
|
||||
|
||||
pub use media::OptimizeImageTask;
|
||||
use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter, FromJsonQueryResult};
|
||||
use serde::{Deserialize, Serialize};
|
||||
pub use subscriber::{
|
||||
SubscriberTask, SubscriberTaskType, SubscriberTaskTypeEnum, SubscriberTaskTypeVariant,
|
||||
SubscriberTaskTypeVariantIter,
|
||||
};
|
||||
pub use subscription::{
|
||||
SyncOneSubscriptionFeedsFullTask, SyncOneSubscriptionFeedsIncrementalTask,
|
||||
SyncOneSubscriptionSourcesTask,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
app::AppContextTrait,
|
||||
errors::{RecorderError, RecorderResult},
|
||||
models::subscriptions::SubscriptionTrait,
|
||||
task::AsyncTaskTrait,
|
||||
pub use system::{
|
||||
SystemTask, SystemTaskType, SystemTaskTypeEnum, SystemTaskTypeVariant,
|
||||
SystemTaskTypeVariantIter,
|
||||
};
|
||||
|
||||
#[derive(
|
||||
Clone,
|
||||
Debug,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
PartialEq,
|
||||
Eq,
|
||||
Copy,
|
||||
DeriveActiveEnum,
|
||||
DeriveDisplay,
|
||||
EnumIter,
|
||||
)]
|
||||
#[sea_orm(rs_type = "String", db_type = "Text")]
|
||||
pub enum SubscriberTaskType {
|
||||
#[serde(rename = "sync_one_subscription_feeds_incremental")]
|
||||
#[sea_orm(string_value = "sync_one_subscription_feeds_incremental")]
|
||||
SyncOneSubscriptionFeedsIncremental,
|
||||
#[serde(rename = "sync_one_subscription_feeds_full")]
|
||||
#[sea_orm(string_value = "sync_one_subscription_feeds_full")]
|
||||
SyncOneSubscriptionFeedsFull,
|
||||
#[serde(rename = "sync_one_subscription_sources")]
|
||||
#[sea_orm(string_value = "sync_one_subscription_sources")]
|
||||
SyncOneSubscriptionSources,
|
||||
}
|
||||
|
||||
impl TryFrom<&SubscriberTask> for serde_json::Value {
|
||||
type Error = RecorderError;
|
||||
|
||||
fn try_from(value: &SubscriberTask) -> Result<Self, Self::Error> {
|
||||
let json_value = serde_json::to_value(value)?;
|
||||
Ok(match json_value {
|
||||
serde_json::Value::Object(mut map) => {
|
||||
map.remove("task_type");
|
||||
serde_json::Value::Object(map)
|
||||
}
|
||||
_ => {
|
||||
unreachable!("subscriber task must be an json object");
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, FromJsonQueryResult)]
|
||||
#[serde(tag = "task_type")]
|
||||
pub enum SubscriberTask {
|
||||
#[serde(rename = "sync_one_subscription_feeds_incremental")]
|
||||
SyncOneSubscriptionFeedsIncremental(SyncOneSubscriptionFeedsIncrementalTask),
|
||||
#[serde(rename = "sync_one_subscription_feeds_full")]
|
||||
SyncOneSubscriptionFeedsFull(SyncOneSubscriptionFeedsFullTask),
|
||||
#[serde(rename = "sync_one_subscription_sources")]
|
||||
SyncOneSubscriptionSources(SyncOneSubscriptionSourcesTask),
|
||||
}
|
||||
|
||||
impl SubscriberTask {
|
||||
pub fn get_subscriber_id(&self) -> i32 {
|
||||
match self {
|
||||
Self::SyncOneSubscriptionFeedsIncremental(task) => task.0.get_subscriber_id(),
|
||||
Self::SyncOneSubscriptionFeedsFull(task) => task.0.get_subscriber_id(),
|
||||
Self::SyncOneSubscriptionSources(task) => task.0.get_subscriber_id(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run(self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
|
||||
match self {
|
||||
Self::SyncOneSubscriptionFeedsIncremental(task) => task.run(ctx).await,
|
||||
Self::SyncOneSubscriptionFeedsFull(task) => task.run(ctx).await,
|
||||
Self::SyncOneSubscriptionSources(task) => task.run(ctx).await,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn task_type(&self) -> SubscriberTaskType {
|
||||
match self {
|
||||
Self::SyncOneSubscriptionFeedsIncremental(_) => {
|
||||
SubscriberTaskType::SyncOneSubscriptionFeedsIncremental
|
||||
}
|
||||
Self::SyncOneSubscriptionFeedsFull(_) => {
|
||||
SubscriberTaskType::SyncOneSubscriptionFeedsFull
|
||||
}
|
||||
Self::SyncOneSubscriptionSources(_) => SubscriberTaskType::SyncOneSubscriptionSources,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Clone,
|
||||
Debug,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
PartialEq,
|
||||
Eq,
|
||||
Copy,
|
||||
DeriveActiveEnum,
|
||||
DeriveDisplay,
|
||||
EnumIter,
|
||||
)]
|
||||
#[sea_orm(rs_type = "String", db_type = "Text")]
|
||||
pub enum SystemTaskType {
|
||||
#[serde(rename = "optimize_image")]
|
||||
#[sea_orm(string_value = "optimize_image")]
|
||||
OptimizeImage,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, FromJsonQueryResult)]
|
||||
pub enum SystemTask {
|
||||
#[serde(rename = "optimize_image")]
|
||||
OptimizeImage(OptimizeImageTask),
|
||||
}
|
||||
|
||||
impl SystemTask {
|
||||
pub async fn run(self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
|
||||
match self {
|
||||
Self::OptimizeImage(task) => task.run(ctx).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
100
apps/recorder/src/task/registry/subscriber.rs
Normal file
100
apps/recorder/src/task/registry/subscriber.rs
Normal file
@ -0,0 +1,100 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter, FromJsonQueryResult};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{
|
||||
app::AppContextTrait,
|
||||
errors::{RecorderError, RecorderResult},
|
||||
models::subscriptions::SubscriptionTrait,
|
||||
task::{
|
||||
AsyncTaskTrait,
|
||||
registry::{
|
||||
SyncOneSubscriptionFeedsFullTask, SyncOneSubscriptionFeedsIncrementalTask,
|
||||
SyncOneSubscriptionSourcesTask,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
#[derive(
|
||||
Clone,
|
||||
Debug,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
PartialEq,
|
||||
Eq,
|
||||
Copy,
|
||||
DeriveActiveEnum,
|
||||
DeriveDisplay,
|
||||
EnumIter,
|
||||
)]
|
||||
#[sea_orm(rs_type = "String", db_type = "Text")]
|
||||
pub enum SubscriberTaskType {
|
||||
#[serde(rename = "sync_one_subscription_feeds_incremental")]
|
||||
#[sea_orm(string_value = "sync_one_subscription_feeds_incremental")]
|
||||
SyncOneSubscriptionFeedsIncremental,
|
||||
#[serde(rename = "sync_one_subscription_feeds_full")]
|
||||
#[sea_orm(string_value = "sync_one_subscription_feeds_full")]
|
||||
SyncOneSubscriptionFeedsFull,
|
||||
#[serde(rename = "sync_one_subscription_sources")]
|
||||
#[sea_orm(string_value = "sync_one_subscription_sources")]
|
||||
SyncOneSubscriptionSources,
|
||||
}
|
||||
|
||||
impl TryFrom<&SubscriberTask> for serde_json::Value {
|
||||
type Error = RecorderError;
|
||||
|
||||
fn try_from(value: &SubscriberTask) -> Result<Self, Self::Error> {
|
||||
let json_value = serde_json::to_value(value)?;
|
||||
Ok(match json_value {
|
||||
serde_json::Value::Object(mut map) => {
|
||||
map.remove("task_type");
|
||||
serde_json::Value::Object(map)
|
||||
}
|
||||
_ => {
|
||||
unreachable!("subscriber task must be an json object");
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, FromJsonQueryResult)]
|
||||
#[serde(tag = "task_type")]
|
||||
pub enum SubscriberTask {
|
||||
#[serde(rename = "sync_one_subscription_feeds_incremental")]
|
||||
SyncOneSubscriptionFeedsIncremental(SyncOneSubscriptionFeedsIncrementalTask),
|
||||
#[serde(rename = "sync_one_subscription_feeds_full")]
|
||||
SyncOneSubscriptionFeedsFull(SyncOneSubscriptionFeedsFullTask),
|
||||
#[serde(rename = "sync_one_subscription_sources")]
|
||||
SyncOneSubscriptionSources(SyncOneSubscriptionSourcesTask),
|
||||
}
|
||||
|
||||
impl SubscriberTask {
|
||||
pub fn get_subscriber_id(&self) -> i32 {
|
||||
match self {
|
||||
Self::SyncOneSubscriptionFeedsIncremental(task) => task.0.get_subscriber_id(),
|
||||
Self::SyncOneSubscriptionFeedsFull(task) => task.0.get_subscriber_id(),
|
||||
Self::SyncOneSubscriptionSources(task) => task.0.get_subscriber_id(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run(self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
|
||||
match self {
|
||||
Self::SyncOneSubscriptionFeedsIncremental(task) => task.run(ctx).await,
|
||||
Self::SyncOneSubscriptionFeedsFull(task) => task.run(ctx).await,
|
||||
Self::SyncOneSubscriptionSources(task) => task.run(ctx).await,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn task_type(&self) -> SubscriberTaskType {
|
||||
match self {
|
||||
Self::SyncOneSubscriptionFeedsIncremental(_) => {
|
||||
SubscriberTaskType::SyncOneSubscriptionFeedsIncremental
|
||||
}
|
||||
Self::SyncOneSubscriptionFeedsFull(_) => {
|
||||
SubscriberTaskType::SyncOneSubscriptionFeedsFull
|
||||
}
|
||||
Self::SyncOneSubscriptionSources(_) => SubscriberTaskType::SyncOneSubscriptionSources,
|
||||
}
|
||||
}
|
||||
}
|
43
apps/recorder/src/task/registry/system.rs
Normal file
43
apps/recorder/src/task/registry/system.rs
Normal file
@ -0,0 +1,43 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter, FromJsonQueryResult};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{
|
||||
app::AppContextTrait,
|
||||
errors::RecorderResult,
|
||||
task::{AsyncTaskTrait, registry::media::OptimizeImageTask},
|
||||
};
|
||||
|
||||
#[derive(
|
||||
Clone,
|
||||
Debug,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
PartialEq,
|
||||
Eq,
|
||||
Copy,
|
||||
DeriveActiveEnum,
|
||||
DeriveDisplay,
|
||||
EnumIter,
|
||||
)]
|
||||
#[sea_orm(rs_type = "String", db_type = "Text")]
|
||||
pub enum SystemTaskType {
|
||||
#[serde(rename = "optimize_image")]
|
||||
#[sea_orm(string_value = "optimize_image")]
|
||||
OptimizeImage,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, FromJsonQueryResult)]
|
||||
pub enum SystemTask {
|
||||
#[serde(rename = "optimize_image")]
|
||||
OptimizeImage(OptimizeImageTask),
|
||||
}
|
||||
|
||||
impl SystemTask {
|
||||
pub async fn run(self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
|
||||
match self {
|
||||
Self::OptimizeImage(task) => task.run(ctx).await,
|
||||
}
|
||||
}
|
||||
}
|
@ -1,16 +1,18 @@
|
||||
use std::{ops::Deref, str::FromStr, sync::Arc};
|
||||
use std::{future::Future, ops::Deref, str::FromStr, sync::Arc};
|
||||
|
||||
use apalis::prelude::*;
|
||||
use apalis_sql::{
|
||||
Config,
|
||||
context::SqlContext,
|
||||
postgres::{PgListen, PostgresStorage},
|
||||
postgres::{PgListen as ApalisPgListen, PostgresStorage as ApalisPostgresStorage},
|
||||
};
|
||||
use sea_orm::sqlx::postgres::PgListener;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::{
|
||||
app::AppContextTrait,
|
||||
errors::{RecorderError, RecorderResult},
|
||||
models::cron::{self, CRON_DUE_EVENT},
|
||||
task::{
|
||||
SUBSCRIBER_TASK_APALIS_NAME, SYSTEM_TASK_APALIS_NAME, SubscriberTask, TaskConfig,
|
||||
config::{default_subscriber_task_workers, default_system_task_workers},
|
||||
@ -21,8 +23,9 @@ use crate::{
|
||||
pub struct TaskService {
|
||||
pub config: TaskConfig,
|
||||
ctx: Arc<dyn AppContextTrait>,
|
||||
subscriber_task_storage: Arc<RwLock<PostgresStorage<SubscriberTask>>>,
|
||||
system_task_storage: Arc<RwLock<PostgresStorage<SystemTask>>>,
|
||||
subscriber_task_storage: Arc<RwLock<ApalisPostgresStorage<SubscriberTask>>>,
|
||||
system_task_storage: Arc<RwLock<ApalisPostgresStorage<SystemTask>>>,
|
||||
cron_worker_id: String,
|
||||
}
|
||||
|
||||
impl TaskService {
|
||||
@ -43,12 +46,13 @@ impl TaskService {
|
||||
let system_task_storage_config =
|
||||
Config::new(SYSTEM_TASK_APALIS_NAME).set_keep_alive(config.system_task_timeout);
|
||||
let subscriber_task_storage =
|
||||
PostgresStorage::new_with_config(pool.clone(), subscriber_task_storage_config);
|
||||
ApalisPostgresStorage::new_with_config(pool.clone(), subscriber_task_storage_config);
|
||||
let system_task_storage =
|
||||
PostgresStorage::new_with_config(pool, system_task_storage_config);
|
||||
ApalisPostgresStorage::new_with_config(pool, system_task_storage_config);
|
||||
|
||||
Ok(Self {
|
||||
config,
|
||||
cron_worker_id: nanoid::nanoid!(),
|
||||
ctx,
|
||||
subscriber_task_storage: Arc::new(RwLock::new(subscriber_task_storage)),
|
||||
system_task_storage: Arc::new(RwLock::new(system_task_storage)),
|
||||
@ -132,8 +136,70 @@ impl TaskService {
|
||||
Ok(task_id)
|
||||
}
|
||||
|
||||
pub async fn setup_monitor(&self) -> RecorderResult<Monitor> {
|
||||
let mut monitor = Monitor::new();
|
||||
pub async fn run<F, Fut>(&self, shutdown_signal: Option<F>) -> RecorderResult<()>
|
||||
where
|
||||
F: Fn() -> Fut + Send + 'static,
|
||||
Fut: Future<Output = ()> + Send,
|
||||
{
|
||||
tokio::try_join!(
|
||||
async {
|
||||
let monitor = self.setup_apalis_monitor().await?;
|
||||
if let Some(shutdown_signal) = shutdown_signal {
|
||||
monitor
|
||||
.run_with_signal(async move {
|
||||
shutdown_signal().await;
|
||||
tracing::info!("apalis shutting down...");
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
} else {
|
||||
monitor.run().await?;
|
||||
}
|
||||
Ok::<_, RecorderError>(())
|
||||
},
|
||||
async {
|
||||
let listener = self.setup_apalis_listener().await?;
|
||||
tokio::task::spawn(async move {
|
||||
if let Err(e) = listener.listen().await {
|
||||
tracing::error!("Error listening to apalis: {e}");
|
||||
}
|
||||
});
|
||||
Ok::<_, RecorderError>(())
|
||||
},
|
||||
async {
|
||||
let listener = self.setup_cron_due_listening().await?;
|
||||
let ctx = self.ctx.clone();
|
||||
let cron_worker_id = self.cron_worker_id.clone();
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
if let Err(e) = Self::listen_cron_due(listener, ctx, &cron_worker_id).await {
|
||||
tracing::error!("Error listening to cron due: {e}");
|
||||
}
|
||||
});
|
||||
|
||||
Ok::<_, RecorderError>(())
|
||||
},
|
||||
async {
|
||||
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
|
||||
let ctx = self.ctx.clone();
|
||||
tokio::task::spawn(async move {
|
||||
loop {
|
||||
interval.tick().await;
|
||||
if let Err(e) = cron::Model::check_and_trigger_due_crons(ctx.as_ref()).await
|
||||
{
|
||||
tracing::error!("Error checking and triggering due crons: {e}");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok::<_, RecorderError>(())
|
||||
}
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn setup_apalis_monitor(&self) -> RecorderResult<Monitor> {
|
||||
let mut apalis_monitor = Monitor::new();
|
||||
|
||||
{
|
||||
let subscriber_task_worker = WorkerBuilder::new(SUBSCRIBER_TASK_APALIS_NAME)
|
||||
@ -155,28 +221,51 @@ impl TaskService {
|
||||
.backend(self.system_task_storage.read().await.clone())
|
||||
.build_fn(Self::run_system_task);
|
||||
|
||||
monitor = monitor
|
||||
apalis_monitor = apalis_monitor
|
||||
.register(subscriber_task_worker)
|
||||
.register(system_task_worker);
|
||||
}
|
||||
|
||||
Ok(monitor)
|
||||
Ok(apalis_monitor)
|
||||
}
|
||||
|
||||
pub async fn setup_listener(&self) -> RecorderResult<PgListen> {
|
||||
async fn setup_apalis_listener(&self) -> RecorderResult<ApalisPgListen> {
|
||||
let pool = self.ctx.db().get_postgres_connection_pool().clone();
|
||||
let mut task_listener = PgListen::new(pool).await?;
|
||||
let mut apalis_pg_listener = ApalisPgListen::new(pool).await?;
|
||||
|
||||
{
|
||||
let mut subscriber_task_storage = self.subscriber_task_storage.write().await;
|
||||
task_listener.subscribe_with(&mut subscriber_task_storage);
|
||||
apalis_pg_listener.subscribe_with(&mut subscriber_task_storage);
|
||||
}
|
||||
|
||||
{
|
||||
let mut system_task_storage = self.system_task_storage.write().await;
|
||||
task_listener.subscribe_with(&mut system_task_storage);
|
||||
apalis_pg_listener.subscribe_with(&mut system_task_storage);
|
||||
}
|
||||
|
||||
Ok(task_listener)
|
||||
Ok(apalis_pg_listener)
|
||||
}
|
||||
|
||||
async fn setup_cron_due_listening(&self) -> RecorderResult<PgListener> {
|
||||
let pool = self.ctx.db().get_postgres_connection_pool().clone();
|
||||
let listener = PgListener::connect_with(&pool).await?;
|
||||
|
||||
Ok(listener)
|
||||
}
|
||||
|
||||
async fn listen_cron_due(
|
||||
mut listener: PgListener,
|
||||
ctx: Arc<dyn AppContextTrait>,
|
||||
worker_id: &str,
|
||||
) -> RecorderResult<()> {
|
||||
listener.listen(CRON_DUE_EVENT).await?;
|
||||
loop {
|
||||
let notification = listener.recv().await?;
|
||||
if let Err(e) =
|
||||
cron::Model::handle_cron_notification(ctx.as_ref(), notification, worker_id).await
|
||||
{
|
||||
tracing::error!("Error handling cron notification: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -9,12 +9,12 @@ pub trait ControllerTrait: Sized {
|
||||
-> Router<Arc<dyn AppContextTrait>>;
|
||||
}
|
||||
|
||||
pub struct PrefixController {
|
||||
pub struct NestRouterController {
|
||||
prefix: Cow<'static, str>,
|
||||
router: Router<Arc<dyn AppContextTrait>>,
|
||||
}
|
||||
|
||||
impl PrefixController {
|
||||
impl NestRouterController {
|
||||
pub fn new(
|
||||
prefix: impl Into<Cow<'static, str>>,
|
||||
router: Router<Arc<dyn AppContextTrait>>,
|
||||
@ -26,7 +26,7 @@ impl PrefixController {
|
||||
}
|
||||
}
|
||||
|
||||
impl ControllerTrait for PrefixController {
|
||||
impl ControllerTrait for NestRouterController {
|
||||
fn apply_to(
|
||||
self,
|
||||
router: Router<Arc<dyn AppContextTrait>>,
|
||||
@ -36,15 +36,15 @@ impl ControllerTrait for PrefixController {
|
||||
}
|
||||
|
||||
pub enum Controller {
|
||||
Prefix(PrefixController),
|
||||
NestRouter(NestRouterController),
|
||||
}
|
||||
|
||||
impl Controller {
|
||||
pub fn from_prefix(
|
||||
pub fn from_nest_router(
|
||||
prefix: impl Into<Cow<'static, str>>,
|
||||
router: Router<Arc<dyn AppContextTrait>>,
|
||||
) -> Self {
|
||||
Self::Prefix(PrefixController::new(prefix, router))
|
||||
Self::NestRouter(NestRouterController::new(prefix, router))
|
||||
}
|
||||
}
|
||||
|
||||
@ -54,7 +54,7 @@ impl ControllerTrait for Controller {
|
||||
router: Router<Arc<dyn AppContextTrait>>,
|
||||
) -> Router<Arc<dyn AppContextTrait>> {
|
||||
match self {
|
||||
Self::Prefix(p) => p.apply_to(router),
|
||||
Self::NestRouter(p) => p.apply_to(router),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -38,5 +38,5 @@ async fn rss_handler(
|
||||
pub async fn create(_ctx: Arc<dyn AppContextTrait>) -> RecorderResult<Controller> {
|
||||
let router = Router::<Arc<dyn AppContextTrait>>::new().route("/rss/{token}", get(rss_handler));
|
||||
|
||||
Ok(Controller::from_prefix(CONTROLLER_PREFIX, router))
|
||||
Ok(Controller::from_nest_router(CONTROLLER_PREFIX, router))
|
||||
}
|
||||
|
@ -71,5 +71,5 @@ pub async fn create(ctx: Arc<dyn AppContextTrait>) -> RecorderResult<Controller>
|
||||
post(graphql_handler).layer(from_fn_with_state(ctx, auth_middleware)),
|
||||
)
|
||||
.route("/introspection", introspection_handler);
|
||||
Ok(Controller::from_prefix(CONTROLLER_PREFIX, router))
|
||||
Ok(Controller::from_nest_router(CONTROLLER_PREFIX, router))
|
||||
}
|
||||
|
@ -38,5 +38,5 @@ pub async fn create(_context: Arc<dyn AppContextTrait>) -> RecorderResult<Contro
|
||||
.route("/health", get(health))
|
||||
.route("/ping", get(ping));
|
||||
|
||||
Ok(Controller::from_prefix(CONTROLLER_PREFIX, router))
|
||||
Ok(Controller::from_nest_router(CONTROLLER_PREFIX, router))
|
||||
}
|
||||
|
@ -5,4 +5,4 @@ pub mod metadata;
|
||||
pub mod oidc;
|
||||
pub mod r#static;
|
||||
|
||||
pub use core::{Controller, ControllerTrait, PrefixController};
|
||||
pub use core::{Controller, ControllerTrait, NestRouterController};
|
||||
|
@ -77,5 +77,5 @@ pub async fn create(_context: Arc<dyn AppContextTrait>) -> RecorderResult<Contro
|
||||
.route("/auth", get(oidc_auth))
|
||||
.route("/callback", get(oidc_callback));
|
||||
|
||||
Ok(Controller::from_prefix(CONTROLLER_PREFIX, router))
|
||||
Ok(Controller::from_nest_router(CONTROLLER_PREFIX, router))
|
||||
}
|
||||
|
@ -99,5 +99,5 @@ pub async fn create(ctx: Arc<dyn AppContextTrait>) -> RecorderResult<Controller>
|
||||
)
|
||||
.route("/public/{*path}", get(serve_public_static));
|
||||
|
||||
Ok(Controller::from_prefix(CONTROLLER_PREFIX, router))
|
||||
Ok(Controller::from_nest_router(CONTROLLER_PREFIX, router))
|
||||
}
|
||||
|
@ -1,8 +0,0 @@
|
||||
AUTH_TYPE = "basic" # or oidc
|
||||
BASIC_USER = "konobangu"
|
||||
BASIC_PASSWORD = "konobangu"
|
||||
# OIDC_ISSUER="https://auth.logto.io/oidc"
|
||||
# OIDC_AUDIENCE = "https://konobangu.com/api"
|
||||
# OIDC_CLIENT_ID = "client_id"
|
||||
# OIDC_CLIENT_SECRET = "client_secret" # optional
|
||||
# OIDC_EXTRA_SCOPES = "read:konobangu write:konobangu"
|
8
apps/webui/.env.development
Normal file
8
apps/webui/.env.development
Normal file
@ -0,0 +1,8 @@
|
||||
AUTH__AUTH_TYPE = "basic" # or oidc
|
||||
AUTH__BASIC_USER = "konobangu"
|
||||
AUTH__BASIC_PASSWORD = "konobangu"
|
||||
# AUTH__OIDC_ISSUER="https://auth.logto.io/oidc"
|
||||
# AUTH__OIDC_AUDIENCE = "https://konobangu.com/api"
|
||||
# AUTH__OIDC_CLIENT_ID = "client_id"
|
||||
# AUTH__OIDC_CLIENT_SECRET = "client_secret" # optional
|
||||
# AUTH__OIDC_EXTRA_SCOPES = "read:konobangu write:konobangu"
|
6
apps/webui/.env.production.example
Normal file
6
apps/webui/.env.production.example
Normal file
@ -0,0 +1,6 @@
|
||||
AUTH__AUTH_TYPE = "basic" # or oidc
|
||||
# AUTH__OIDC_ISSUER="https://auth.logto.io/oidc"
|
||||
# AUTH__OIDC_AUDIENCE = "https://konobangu.com/api"
|
||||
# AUTH__OIDC_CLIENT_ID = "client_id"
|
||||
# AUTH__OIDC_CLIENT_SECRET = "client_secret" # optional
|
||||
# AUTH__OIDC_EXTRA_SCOPES = "read:konobangu write:konobangu"
|
@ -20,15 +20,23 @@ export default defineConfig({
|
||||
index: './src/main.tsx',
|
||||
},
|
||||
define: {
|
||||
'process.env.AUTH_TYPE': JSON.stringify(process.env.AUTH_TYPE),
|
||||
'process.env.OIDC_CLIENT_ID': JSON.stringify(process.env.OIDC_CLIENT_ID),
|
||||
'process.env.OIDC_CLIENT_SECRET': JSON.stringify(
|
||||
process.env.OIDC_CLIENT_SECRET
|
||||
'process.env.AUTH__AUTH_TYPE': JSON.stringify(
|
||||
process.env.AUTH__AUTH_TYPE
|
||||
),
|
||||
'process.env.OIDC_ISSUER': JSON.stringify(process.env.OIDC_ISSUER),
|
||||
'process.env.OIDC_AUDIENCE': JSON.stringify(process.env.OIDC_AUDIENCE),
|
||||
'process.env.OIDC_EXTRA_SCOPES': JSON.stringify(
|
||||
process.env.OIDC_EXTRA_SCOPES
|
||||
'process.env.AUTH__OIDC_CLIENT_ID': JSON.stringify(
|
||||
process.env.AUTH__OIDC_CLIENT_ID
|
||||
),
|
||||
'process.env.AUTH__OIDC_CLIENT_SECRET': JSON.stringify(
|
||||
process.env.AUTH__OIDC_CLIENT_SECRET
|
||||
),
|
||||
'process.env.AUTH__OIDC_ISSUER': JSON.stringify(
|
||||
process.env.AUTH__OIDC_ISSUER
|
||||
),
|
||||
'process.env.AUTH__OIDC_AUDIENCE': JSON.stringify(
|
||||
process.env.AUTH__OIDC_AUDIENCE
|
||||
),
|
||||
'process.env.AUTH__OIDC_EXTRA_SCOPES': JSON.stringify(
|
||||
process.env.AUTH__OIDC_EXTRA_SCOPES
|
||||
),
|
||||
},
|
||||
},
|
||||
@ -39,7 +47,7 @@ export default defineConfig({
|
||||
setupMiddlewares: [
|
||||
(middlewares) => {
|
||||
middlewares.unshift((req, res, next) => {
|
||||
if (process.env.AUTH_TYPE === 'basic') {
|
||||
if (process.env.AUTH__AUTH_TYPE === 'basic') {
|
||||
res.setHeader('WWW-Authenticate', 'Basic realm="konobangu"');
|
||||
|
||||
const authorization =
|
||||
@ -49,8 +57,8 @@ export default defineConfig({
|
||||
.split(':');
|
||||
|
||||
if (
|
||||
user !== process.env.BASIC_USER ||
|
||||
password !== process.env.BASIC_PASSWORD
|
||||
user !== process.env.AUTH__BASIC_USER ||
|
||||
password !== process.env.AUTH__BASIC_PASSWORD
|
||||
) {
|
||||
res.statusCode = 401;
|
||||
res.write('Unauthorized');
|
||||
|
@ -1,6 +1,6 @@
|
||||
"use client";
|
||||
|
||||
import { MoreHorizontal } from "lucide-react";
|
||||
import { Eye, MoreHorizontal } from "lucide-react";
|
||||
|
||||
import { Button } from "@/components/ui/button";
|
||||
import {
|
||||
@ -18,7 +18,7 @@ import { ComponentProps, PropsWithChildren } from "react";
|
||||
interface DropdownMenuActionsProps<Id>
|
||||
extends ComponentProps<typeof DropdownMenuPrimitive.Root> {
|
||||
id: Id;
|
||||
showDetail?: boolean;
|
||||
showDetail?: boolean | "dropdown-menu";
|
||||
showEdit?: boolean;
|
||||
showDelete?: boolean;
|
||||
onDetail?: (id: Id) => void;
|
||||
@ -38,34 +38,49 @@ export function DropdownMenuActions<Id>({
|
||||
...rest
|
||||
}: PropsWithChildren<DropdownMenuActionsProps<Id>>) {
|
||||
return (
|
||||
<DropdownMenu {...rest}>
|
||||
<DropdownMenuTrigger asChild>
|
||||
<div className="flex gap-2 items-center justify-center">
|
||||
{showDetail === true && (
|
||||
<Button
|
||||
variant="ghost"
|
||||
className="flex h-8 w-8 p-0 data-[state=open]:bg-muted"
|
||||
onClick={() => onDetail?.(id)}
|
||||
>
|
||||
<MoreHorizontal />
|
||||
<span className="sr-only">Open menu</span>
|
||||
<Eye />
|
||||
<span className="sr-only">Detail</span>
|
||||
</Button>
|
||||
</DropdownMenuTrigger>
|
||||
<DropdownMenuContent align="end" className="w-[160px]">
|
||||
{children}
|
||||
{showDetail && (
|
||||
<DropdownMenuItem onClick={() => onDetail?.(id)}>
|
||||
Detail
|
||||
</DropdownMenuItem>
|
||||
)}
|
||||
{showEdit && (
|
||||
<DropdownMenuItem onClick={() => onEdit?.(id)}>Edit</DropdownMenuItem>
|
||||
)}
|
||||
{(showDetail || showEdit) && showDelete && <DropdownMenuSeparator />}
|
||||
{showDelete && (
|
||||
<DropdownMenuItem onClick={() => onDelete?.(id)}>
|
||||
Delete
|
||||
<DropdownMenuShortcut>⌘⌫</DropdownMenuShortcut>
|
||||
</DropdownMenuItem>
|
||||
)}
|
||||
</DropdownMenuContent>
|
||||
</DropdownMenu>
|
||||
)}
|
||||
<DropdownMenu {...rest}>
|
||||
<DropdownMenuTrigger asChild>
|
||||
<Button
|
||||
variant="ghost"
|
||||
className="flex h-8 w-8 p-0 data-[state=open]:bg-muted"
|
||||
>
|
||||
<MoreHorizontal />
|
||||
<span className="sr-only">Open menu</span>
|
||||
</Button>
|
||||
</DropdownMenuTrigger>
|
||||
<DropdownMenuContent align="end" className="w-[160px]">
|
||||
{children}
|
||||
{showDetail === "dropdown-menu" && (
|
||||
<DropdownMenuItem onClick={() => onDetail?.(id)}>
|
||||
Detail
|
||||
</DropdownMenuItem>
|
||||
)}
|
||||
{showEdit && (
|
||||
<DropdownMenuItem onClick={() => onEdit?.(id)}>
|
||||
Edit
|
||||
</DropdownMenuItem>
|
||||
)}
|
||||
{(showDetail === "dropdown-menu" || showEdit || children) &&
|
||||
showDelete && <DropdownMenuSeparator />}
|
||||
{showDelete && (
|
||||
<DropdownMenuItem onClick={() => onDelete?.(id)}>
|
||||
Delete
|
||||
<DropdownMenuShortcut>⌘⌫</DropdownMenuShortcut>
|
||||
</DropdownMenuItem>
|
||||
)}
|
||||
</DropdownMenuContent>
|
||||
</DropdownMenu>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
@ -105,6 +105,13 @@ query GetSubscriptionDetail ($id: Int!) {
|
||||
feedSource
|
||||
}
|
||||
}
|
||||
subscriberTask {
|
||||
nodes {
|
||||
id
|
||||
taskType
|
||||
status
|
||||
}
|
||||
}
|
||||
credential3rd {
|
||||
id
|
||||
username
|
||||
|
@ -8,5 +8,5 @@ export const AUTH_METHOD = {
|
||||
export type AuthMethodType = ValueOf<typeof AUTH_METHOD>;
|
||||
|
||||
export function getAppAuthMethod(): AuthMethodType {
|
||||
return process.env.AUTH_TYPE as AuthMethodType;
|
||||
return process.env.AUTH__AUTH_TYPE as AuthMethodType;
|
||||
}
|
||||
|
@ -3,16 +3,16 @@ import { LogLevel, type OpenIdConfiguration } from 'oidc-client-rx';
|
||||
export function buildOidcConfig(): OpenIdConfiguration {
|
||||
const origin = window.location.origin;
|
||||
|
||||
const resource = process.env.OIDC_AUDIENCE!;
|
||||
const resource = process.env.AUTH__OIDC_AUDIENCE!;
|
||||
|
||||
return {
|
||||
authority: process.env.OIDC_ISSUER!,
|
||||
authority: process.env.AUTH__OIDC_ISSUER!,
|
||||
redirectUrl: `${origin}/auth/oidc/callback`,
|
||||
postLogoutRedirectUri: `${origin}/`,
|
||||
clientId: process.env.OIDC_CLIENT_ID!,
|
||||
clientSecret: process.env.OIDC_CLIENT_SECRET,
|
||||
scope: process.env.OIDC_EXTRA_SCOPES
|
||||
? `openid profile email offline_access ${process.env.OIDC_EXTRA_SCOPES}`
|
||||
clientId: process.env.AUTH__OIDC_CLIENT_ID!,
|
||||
clientSecret: process.env.AUTH__OIDC_CLIENT_SECRET,
|
||||
scope: process.env.AUTH__OIDC_EXTRA_SCOPES
|
||||
? `openid profile email offline_access ${process.env.AUTH__OIDC_EXTRA_SCOPES}`
|
||||
: 'openid profile email offline_access',
|
||||
triggerAuthorizationResultEvent: true,
|
||||
responseType: 'code',
|
||||
|
@ -26,7 +26,7 @@ type Documents = {
|
||||
"\n mutation InsertSubscription($data: SubscriptionsInsertInput!) {\n subscriptionsCreateOne(data: $data) {\n id\n createdAt\n updatedAt\n displayName\n category\n sourceUrl\n enabled\n credentialId\n }\n }\n": typeof types.InsertSubscriptionDocument,
|
||||
"\n mutation UpdateSubscriptions(\n $data: SubscriptionsUpdateInput!,\n $filters: SubscriptionsFilterInput!,\n ) {\n subscriptionsUpdate (\n data: $data\n filter: $filters\n ) {\n id\n createdAt\n updatedAt\n displayName\n category\n sourceUrl\n enabled\n }\n}\n": typeof types.UpdateSubscriptionsDocument,
|
||||
"\n mutation DeleteSubscriptions($filters: SubscriptionsFilterInput) {\n subscriptionsDelete(filter: $filters)\n }\n": typeof types.DeleteSubscriptionsDocument,
|
||||
"\nquery GetSubscriptionDetail ($id: Int!) {\n subscriptions(filters: { id: {\n eq: $id\n } }) {\n nodes {\n id\n displayName\n createdAt\n updatedAt\n category\n sourceUrl\n enabled\n feed {\n nodes {\n id\n createdAt\n updatedAt\n token\n feedType\n feedSource\n }\n }\n credential3rd {\n id\n username\n }\n bangumi {\n nodes {\n createdAt\n updatedAt\n id\n mikanBangumiId\n displayName\n season\n seasonRaw\n fansub\n mikanFansubId\n rssLink\n posterLink\n homepage\n }\n }\n }\n }\n}\n": typeof types.GetSubscriptionDetailDocument,
|
||||
"\nquery GetSubscriptionDetail ($id: Int!) {\n subscriptions(filters: { id: {\n eq: $id\n } }) {\n nodes {\n id\n displayName\n createdAt\n updatedAt\n category\n sourceUrl\n enabled\n feed {\n nodes {\n id\n createdAt\n updatedAt\n token\n feedType\n feedSource\n }\n }\n subscriberTask {\n nodes {\n id\n taskType\n status\n }\n }\n credential3rd {\n id\n username\n }\n bangumi {\n nodes {\n createdAt\n updatedAt\n id\n mikanBangumiId\n displayName\n season\n seasonRaw\n fansub\n mikanFansubId\n rssLink\n posterLink\n homepage\n }\n }\n }\n }\n}\n": typeof types.GetSubscriptionDetailDocument,
|
||||
"\n mutation SyncSubscriptionFeedsIncremental($filter: SubscriptionsFilterInput!) {\n subscriptionsSyncOneFeedsIncremental(filter: $filter) {\n id\n }\n }\n": typeof types.SyncSubscriptionFeedsIncrementalDocument,
|
||||
"\n mutation SyncSubscriptionFeedsFull($filter: SubscriptionsFilterInput!) {\n subscriptionsSyncOneFeedsFull(filter: $filter) {\n id\n }\n }\n": typeof types.SyncSubscriptionFeedsFullDocument,
|
||||
"\n mutation SyncSubscriptionSources($filter: SubscriptionsFilterInput!) {\n subscriptionsSyncOneSources(filter: $filter) {\n id\n }\n }\n": typeof types.SyncSubscriptionSourcesDocument,
|
||||
@ -47,7 +47,7 @@ const documents: Documents = {
|
||||
"\n mutation InsertSubscription($data: SubscriptionsInsertInput!) {\n subscriptionsCreateOne(data: $data) {\n id\n createdAt\n updatedAt\n displayName\n category\n sourceUrl\n enabled\n credentialId\n }\n }\n": types.InsertSubscriptionDocument,
|
||||
"\n mutation UpdateSubscriptions(\n $data: SubscriptionsUpdateInput!,\n $filters: SubscriptionsFilterInput!,\n ) {\n subscriptionsUpdate (\n data: $data\n filter: $filters\n ) {\n id\n createdAt\n updatedAt\n displayName\n category\n sourceUrl\n enabled\n }\n}\n": types.UpdateSubscriptionsDocument,
|
||||
"\n mutation DeleteSubscriptions($filters: SubscriptionsFilterInput) {\n subscriptionsDelete(filter: $filters)\n }\n": types.DeleteSubscriptionsDocument,
|
||||
"\nquery GetSubscriptionDetail ($id: Int!) {\n subscriptions(filters: { id: {\n eq: $id\n } }) {\n nodes {\n id\n displayName\n createdAt\n updatedAt\n category\n sourceUrl\n enabled\n feed {\n nodes {\n id\n createdAt\n updatedAt\n token\n feedType\n feedSource\n }\n }\n credential3rd {\n id\n username\n }\n bangumi {\n nodes {\n createdAt\n updatedAt\n id\n mikanBangumiId\n displayName\n season\n seasonRaw\n fansub\n mikanFansubId\n rssLink\n posterLink\n homepage\n }\n }\n }\n }\n}\n": types.GetSubscriptionDetailDocument,
|
||||
"\nquery GetSubscriptionDetail ($id: Int!) {\n subscriptions(filters: { id: {\n eq: $id\n } }) {\n nodes {\n id\n displayName\n createdAt\n updatedAt\n category\n sourceUrl\n enabled\n feed {\n nodes {\n id\n createdAt\n updatedAt\n token\n feedType\n feedSource\n }\n }\n subscriberTask {\n nodes {\n id\n taskType\n status\n }\n }\n credential3rd {\n id\n username\n }\n bangumi {\n nodes {\n createdAt\n updatedAt\n id\n mikanBangumiId\n displayName\n season\n seasonRaw\n fansub\n mikanFansubId\n rssLink\n posterLink\n homepage\n }\n }\n }\n }\n}\n": types.GetSubscriptionDetailDocument,
|
||||
"\n mutation SyncSubscriptionFeedsIncremental($filter: SubscriptionsFilterInput!) {\n subscriptionsSyncOneFeedsIncremental(filter: $filter) {\n id\n }\n }\n": types.SyncSubscriptionFeedsIncrementalDocument,
|
||||
"\n mutation SyncSubscriptionFeedsFull($filter: SubscriptionsFilterInput!) {\n subscriptionsSyncOneFeedsFull(filter: $filter) {\n id\n }\n }\n": types.SyncSubscriptionFeedsFullDocument,
|
||||
"\n mutation SyncSubscriptionSources($filter: SubscriptionsFilterInput!) {\n subscriptionsSyncOneSources(filter: $filter) {\n id\n }\n }\n": types.SyncSubscriptionSourcesDocument,
|
||||
@ -121,7 +121,7 @@ export function gql(source: "\n mutation DeleteSubscriptions($filters: Subscr
|
||||
/**
|
||||
* The gql function is used to parse GraphQL queries into a document that can be used by GraphQL clients.
|
||||
*/
|
||||
export function gql(source: "\nquery GetSubscriptionDetail ($id: Int!) {\n subscriptions(filters: { id: {\n eq: $id\n } }) {\n nodes {\n id\n displayName\n createdAt\n updatedAt\n category\n sourceUrl\n enabled\n feed {\n nodes {\n id\n createdAt\n updatedAt\n token\n feedType\n feedSource\n }\n }\n credential3rd {\n id\n username\n }\n bangumi {\n nodes {\n createdAt\n updatedAt\n id\n mikanBangumiId\n displayName\n season\n seasonRaw\n fansub\n mikanFansubId\n rssLink\n posterLink\n homepage\n }\n }\n }\n }\n}\n"): (typeof documents)["\nquery GetSubscriptionDetail ($id: Int!) {\n subscriptions(filters: { id: {\n eq: $id\n } }) {\n nodes {\n id\n displayName\n createdAt\n updatedAt\n category\n sourceUrl\n enabled\n feed {\n nodes {\n id\n createdAt\n updatedAt\n token\n feedType\n feedSource\n }\n }\n credential3rd {\n id\n username\n }\n bangumi {\n nodes {\n createdAt\n updatedAt\n id\n mikanBangumiId\n displayName\n season\n seasonRaw\n fansub\n mikanFansubId\n rssLink\n posterLink\n homepage\n }\n }\n }\n }\n}\n"];
|
||||
export function gql(source: "\nquery GetSubscriptionDetail ($id: Int!) {\n subscriptions(filters: { id: {\n eq: $id\n } }) {\n nodes {\n id\n displayName\n createdAt\n updatedAt\n category\n sourceUrl\n enabled\n feed {\n nodes {\n id\n createdAt\n updatedAt\n token\n feedType\n feedSource\n }\n }\n subscriberTask {\n nodes {\n id\n taskType\n status\n }\n }\n credential3rd {\n id\n username\n }\n bangumi {\n nodes {\n createdAt\n updatedAt\n id\n mikanBangumiId\n displayName\n season\n seasonRaw\n fansub\n mikanFansubId\n rssLink\n posterLink\n homepage\n }\n }\n }\n }\n}\n"): (typeof documents)["\nquery GetSubscriptionDetail ($id: Int!) {\n subscriptions(filters: { id: {\n eq: $id\n } }) {\n nodes {\n id\n displayName\n createdAt\n updatedAt\n category\n sourceUrl\n enabled\n feed {\n nodes {\n id\n createdAt\n updatedAt\n token\n feedType\n feedSource\n }\n }\n subscriberTask {\n nodes {\n id\n taskType\n status\n }\n }\n credential3rd {\n id\n username\n }\n bangumi {\n nodes {\n createdAt\n updatedAt\n id\n mikanBangumiId\n displayName\n season\n seasonRaw\n fansub\n mikanFansubId\n rssLink\n posterLink\n homepage\n }\n }\n }\n }\n}\n"];
|
||||
/**
|
||||
* The gql function is used to parse GraphQL queries into a document that can be used by GraphQL clients.
|
||||
*/
|
||||
|
@ -1441,6 +1441,8 @@ export type SubscriberTasks = {
|
||||
status: SubscriberTaskStatusEnum;
|
||||
subscriber?: Maybe<Subscribers>;
|
||||
subscriberId: Scalars['Int']['output'];
|
||||
subscription?: Maybe<Subscriptions>;
|
||||
subscriptionId?: Maybe<Scalars['Int']['output']>;
|
||||
taskType: SubscriberTaskTypeEnum;
|
||||
};
|
||||
|
||||
@ -1473,6 +1475,7 @@ export type SubscriberTasksFilterInput = {
|
||||
runAt?: InputMaybe<TextFilterInput>;
|
||||
status?: InputMaybe<StringFilterInput>;
|
||||
subscriberId?: InputMaybe<SubscriberIdFilterInput>;
|
||||
subscriptionId?: InputMaybe<IntegerFilterInput>;
|
||||
taskType?: InputMaybe<StringFilterInput>;
|
||||
};
|
||||
|
||||
@ -1489,6 +1492,7 @@ export type SubscriberTasksOrderInput = {
|
||||
runAt?: InputMaybe<OrderByEnum>;
|
||||
status?: InputMaybe<OrderByEnum>;
|
||||
subscriberId?: InputMaybe<OrderByEnum>;
|
||||
subscriptionId?: InputMaybe<OrderByEnum>;
|
||||
taskType?: InputMaybe<OrderByEnum>;
|
||||
};
|
||||
|
||||
@ -1745,6 +1749,7 @@ export type Subscriptions = {
|
||||
sourceUrl: Scalars['String']['output'];
|
||||
subscriber?: Maybe<Subscribers>;
|
||||
subscriberId: Scalars['Int']['output'];
|
||||
subscriberTask: SubscriberTasksConnection;
|
||||
subscriptionBangumi: SubscriptionBangumiConnection;
|
||||
subscriptionEpisode: SubscriptionEpisodeConnection;
|
||||
updatedAt: Scalars['String']['output'];
|
||||
@ -1772,6 +1777,13 @@ export type SubscriptionsFeedArgs = {
|
||||
};
|
||||
|
||||
|
||||
export type SubscriptionsSubscriberTaskArgs = {
|
||||
filters?: InputMaybe<SubscriberTasksFilterInput>;
|
||||
orderBy?: InputMaybe<SubscriberTasksOrderInput>;
|
||||
pagination?: InputMaybe<PaginationInput>;
|
||||
};
|
||||
|
||||
|
||||
export type SubscriptionsSubscriptionBangumiArgs = {
|
||||
filters?: InputMaybe<SubscriptionBangumiFilterInput>;
|
||||
orderBy?: InputMaybe<SubscriptionBangumiOrderInput>;
|
||||
@ -1971,7 +1983,7 @@ export type GetSubscriptionDetailQueryVariables = Exact<{
|
||||
}>;
|
||||
|
||||
|
||||
export type GetSubscriptionDetailQuery = { __typename?: 'Query', subscriptions: { __typename?: 'SubscriptionsConnection', nodes: Array<{ __typename?: 'Subscriptions', id: number, displayName: string, createdAt: string, updatedAt: string, category: SubscriptionCategoryEnum, sourceUrl: string, enabled: boolean, feed: { __typename?: 'FeedsConnection', nodes: Array<{ __typename?: 'Feeds', id: number, createdAt: string, updatedAt: string, token: string, feedType: FeedTypeEnum, feedSource: FeedSourceEnum }> }, credential3rd?: { __typename?: 'Credential3rd', id: number, username?: string | null } | null, bangumi: { __typename?: 'BangumiConnection', nodes: Array<{ __typename?: 'Bangumi', createdAt: string, updatedAt: string, id: number, mikanBangumiId?: string | null, displayName: string, season: number, seasonRaw?: string | null, fansub?: string | null, mikanFansubId?: string | null, rssLink?: string | null, posterLink?: string | null, homepage?: string | null }> } }> } };
|
||||
export type GetSubscriptionDetailQuery = { __typename?: 'Query', subscriptions: { __typename?: 'SubscriptionsConnection', nodes: Array<{ __typename?: 'Subscriptions', id: number, displayName: string, createdAt: string, updatedAt: string, category: SubscriptionCategoryEnum, sourceUrl: string, enabled: boolean, feed: { __typename?: 'FeedsConnection', nodes: Array<{ __typename?: 'Feeds', id: number, createdAt: string, updatedAt: string, token: string, feedType: FeedTypeEnum, feedSource: FeedSourceEnum }> }, subscriberTask: { __typename?: 'SubscriberTasksConnection', nodes: Array<{ __typename?: 'SubscriberTasks', id: string, taskType: SubscriberTaskTypeEnum, status: SubscriberTaskStatusEnum }> }, credential3rd?: { __typename?: 'Credential3rd', id: number, username?: string | null } | null, bangumi: { __typename?: 'BangumiConnection', nodes: Array<{ __typename?: 'Bangumi', createdAt: string, updatedAt: string, id: number, mikanBangumiId?: string | null, displayName: string, season: number, seasonRaw?: string | null, fansub?: string | null, mikanFansubId?: string | null, rssLink?: string | null, posterLink?: string | null, homepage?: string | null }> } }> } };
|
||||
|
||||
export type SyncSubscriptionFeedsIncrementalMutationVariables = Exact<{
|
||||
filter: SubscriptionsFilterInput;
|
||||
@ -2030,7 +2042,7 @@ export const GetSubscriptionsDocument = {"kind":"Document","definitions":[{"kind
|
||||
export const InsertSubscriptionDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"mutation","name":{"kind":"Name","value":"InsertSubscription"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"data"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"SubscriptionsInsertInput"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"subscriptionsCreateOne"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"data"},"value":{"kind":"Variable","name":{"kind":"Name","value":"data"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}},{"kind":"Field","name":{"kind":"Name","value":"updatedAt"}},{"kind":"Field","name":{"kind":"Name","value":"displayName"}},{"kind":"Field","name":{"kind":"Name","value":"category"}},{"kind":"Field","name":{"kind":"Name","value":"sourceUrl"}},{"kind":"Field","name":{"kind":"Name","value":"enabled"}},{"kind":"Field","name":{"kind":"Name","value":"credentialId"}}]}}]}}]} as unknown as DocumentNode<InsertSubscriptionMutation, InsertSubscriptionMutationVariables>;
|
||||
export const UpdateSubscriptionsDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"mutation","name":{"kind":"Name","value":"UpdateSubscriptions"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"data"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"SubscriptionsUpdateInput"}}}},{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"filters"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"SubscriptionsFilterInput"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"subscriptionsUpdate"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"data"},"value":{"kind":"Variable","name":{"kind":"Name","value":"data"}}},{"kind":"Argument","name":{"kind":"Name","value":"filter"},"value":{"kind":"Variable","name":{"kind":"Name","value":"filters"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}},{"kind":"Field","name":{"kind":"Name","value":"updatedAt"}},{"kind":"Field","name":{"kind":"Name","value":"displayName"}},{"kind":"Field","name":{"kind":"Name","value":"category"}},{"kind":"Field","name":{"kind":"Name","value":"sourceUrl"}},{"kind":"Field","name":{"kind":"Name","value":"enabled"}}]}}]}}]} as unknown as DocumentNode<UpdateSubscriptionsMutation, UpdateSubscriptionsMutationVariables>;
|
||||
export const DeleteSubscriptionsDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"mutation","name":{"kind":"Name","value":"DeleteSubscriptions"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"filters"}},"type":{"kind":"NamedType","name":{"kind":"Name","value":"SubscriptionsFilterInput"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"subscriptionsDelete"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"filter"},"value":{"kind":"Variable","name":{"kind":"Name","value":"filters"}}}]}]}}]} as unknown as DocumentNode<DeleteSubscriptionsMutation, DeleteSubscriptionsMutationVariables>;
|
||||
export const GetSubscriptionDetailDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"GetSubscriptionDetail"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"id"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"Int"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"subscriptions"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"filters"},"value":{"kind":"ObjectValue","fields":[{"kind":"ObjectField","name":{"kind":"Name","value":"id"},"value":{"kind":"ObjectValue","fields":[{"kind":"ObjectField","name":{"kind":"Name","value":"eq"},"value":{"kind":"Variable","name":{"kind":"Name","value":"id"}}}]}}]}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"nodes"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"displayName"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}},{"kind":"Field","name":{"kind":"Name","value":"updatedAt"}},{"kind":"Field","name":{"kind":"Name","value":"category"}},{"kind":"Field","name":{"kind":"Name","value":"sourceUrl"}},{"kind":"Field","name":{"kind":"Name","value":"enabled"}},{"kind":"Field","name":{"kind":"Name","value":"feed"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"nodes"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}},{"kind":"Field","name":{"kind":"Name","value":"updatedAt"}},{"kind":"Field","name":{"kind":"Name","value":"token"}},{"kind":"Field","name":{"kind":"Name","value":"feedType"}},{"kind":"Field","name":{"kind":"Name","value":"feedSource"}}]}}]}},{"kind":"Field","name":{"kind":"Name","value":"credential3rd"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"username"}}]}},{"kind":"Field","name":{"kind":"Name","value":"bangumi"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"nodes"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"createdAt"}},{"kind":"Field","name":{"kind":"Name","value":"updatedAt"}},{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"mikanBangumiId"}},{"kind":"Field","name":{"kind":"Name","value":"displayName"}},{"kind":"Field","name":{"kind":"Name","value":"season"}},{"kind":"Field","name":{"kind":"Name","value":"seasonRaw"}},{"kind":"Field","name":{"kind":"Name","value":"fansub"}},{"kind":"Field","name":{"kind":"Name","value":"mikanFansubId"}},{"kind":"Field","name":{"kind":"Name","value":"rssLink"}},{"kind":"Field","name":{"kind":"Name","value":"posterLink"}},{"kind":"Field","name":{"kind":"Name","value":"homepage"}}]}}]}}]}}]}}]}}]} as unknown as DocumentNode<GetSubscriptionDetailQuery, GetSubscriptionDetailQueryVariables>;
|
||||
export const GetSubscriptionDetailDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"GetSubscriptionDetail"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"id"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"Int"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"subscriptions"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"filters"},"value":{"kind":"ObjectValue","fields":[{"kind":"ObjectField","name":{"kind":"Name","value":"id"},"value":{"kind":"ObjectValue","fields":[{"kind":"ObjectField","name":{"kind":"Name","value":"eq"},"value":{"kind":"Variable","name":{"kind":"Name","value":"id"}}}]}}]}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"nodes"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"displayName"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}},{"kind":"Field","name":{"kind":"Name","value":"updatedAt"}},{"kind":"Field","name":{"kind":"Name","value":"category"}},{"kind":"Field","name":{"kind":"Name","value":"sourceUrl"}},{"kind":"Field","name":{"kind":"Name","value":"enabled"}},{"kind":"Field","name":{"kind":"Name","value":"feed"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"nodes"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}},{"kind":"Field","name":{"kind":"Name","value":"updatedAt"}},{"kind":"Field","name":{"kind":"Name","value":"token"}},{"kind":"Field","name":{"kind":"Name","value":"feedType"}},{"kind":"Field","name":{"kind":"Name","value":"feedSource"}}]}}]}},{"kind":"Field","name":{"kind":"Name","value":"subscriberTask"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"nodes"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"taskType"}},{"kind":"Field","name":{"kind":"Name","value":"status"}}]}}]}},{"kind":"Field","name":{"kind":"Name","value":"credential3rd"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"username"}}]}},{"kind":"Field","name":{"kind":"Name","value":"bangumi"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"nodes"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"createdAt"}},{"kind":"Field","name":{"kind":"Name","value":"updatedAt"}},{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"mikanBangumiId"}},{"kind":"Field","name":{"kind":"Name","value":"displayName"}},{"kind":"Field","name":{"kind":"Name","value":"season"}},{"kind":"Field","name":{"kind":"Name","value":"seasonRaw"}},{"kind":"Field","name":{"kind":"Name","value":"fansub"}},{"kind":"Field","name":{"kind":"Name","value":"mikanFansubId"}},{"kind":"Field","name":{"kind":"Name","value":"rssLink"}},{"kind":"Field","name":{"kind":"Name","value":"posterLink"}},{"kind":"Field","name":{"kind":"Name","value":"homepage"}}]}}]}}]}}]}}]}}]} as unknown as DocumentNode<GetSubscriptionDetailQuery, GetSubscriptionDetailQueryVariables>;
|
||||
export const SyncSubscriptionFeedsIncrementalDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"mutation","name":{"kind":"Name","value":"SyncSubscriptionFeedsIncremental"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"filter"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"SubscriptionsFilterInput"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"subscriptionsSyncOneFeedsIncremental"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"filter"},"value":{"kind":"Variable","name":{"kind":"Name","value":"filter"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}}]}}]}}]} as unknown as DocumentNode<SyncSubscriptionFeedsIncrementalMutation, SyncSubscriptionFeedsIncrementalMutationVariables>;
|
||||
export const SyncSubscriptionFeedsFullDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"mutation","name":{"kind":"Name","value":"SyncSubscriptionFeedsFull"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"filter"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"SubscriptionsFilterInput"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"subscriptionsSyncOneFeedsFull"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"filter"},"value":{"kind":"Variable","name":{"kind":"Name","value":"filter"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}}]}}]}}]} as unknown as DocumentNode<SyncSubscriptionFeedsFullMutation, SyncSubscriptionFeedsFullMutationVariables>;
|
||||
export const SyncSubscriptionSourcesDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"mutation","name":{"kind":"Name","value":"SyncSubscriptionSources"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"filter"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"SubscriptionsFilterInput"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"subscriptionsSyncOneSources"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"filter"},"value":{"kind":"Variable","name":{"kind":"Name","value":"filter"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}}]}}]}}]} as unknown as DocumentNode<SyncSubscriptionSourcesMutation, SyncSubscriptionSourcesMutationVariables>;
|
||||
|
@ -51,6 +51,7 @@ import {
|
||||
} from 'lucide-react';
|
||||
import { useMemo } from 'react';
|
||||
import { toast } from 'sonner';
|
||||
import { prettyTaskType } from '../tasks/-pretty-task-type';
|
||||
import { SubscriptionSyncDialogContent } from './-sync';
|
||||
|
||||
export const Route = createFileRoute('/_app/subscriptions/detail/$id')({
|
||||
@ -212,18 +213,6 @@ function SubscriptionDetailRouteComponent() {
|
||||
</CardDescription>
|
||||
</div>
|
||||
<div className="flex gap-2">
|
||||
<Dialog>
|
||||
<DialogTrigger asChild>
|
||||
<Button variant="outline" size="sm">
|
||||
<RefreshCcwIcon className="h-4 w-4" />
|
||||
Sync
|
||||
</Button>
|
||||
</DialogTrigger>
|
||||
<SubscriptionSyncDialogContent
|
||||
id={subscription.id}
|
||||
onCancel={handleReload}
|
||||
/>
|
||||
</Dialog>
|
||||
<Button
|
||||
variant="outline"
|
||||
size="sm"
|
||||
@ -446,6 +435,64 @@ function SubscriptionDetailRouteComponent() {
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<Separator />
|
||||
<div className="space-y-4">
|
||||
<div className="flex items-center justify-between">
|
||||
<Label className="font-medium text-sm">Associated Tasks</Label>
|
||||
<Dialog>
|
||||
<DialogTrigger asChild>
|
||||
<Button variant="outline" size="sm">
|
||||
<RefreshCcwIcon className="h-4 w-4" />
|
||||
Sync
|
||||
</Button>
|
||||
</DialogTrigger>
|
||||
<SubscriptionSyncDialogContent
|
||||
id={subscription.id}
|
||||
onCancel={handleReload}
|
||||
/>
|
||||
</Dialog>
|
||||
</div>
|
||||
<div className="grid grid-cols-1 gap-3 sm:grid-cols-2 lg:grid-cols-3">
|
||||
{subscription.subscriberTask?.nodes &&
|
||||
subscription.subscriberTask.nodes.length > 0 ? (
|
||||
subscription.subscriberTask.nodes.map((task) => (
|
||||
<Card
|
||||
key={task.id}
|
||||
className="group relative cursor-pointer p-4 transition-colors hover:bg-accent/50"
|
||||
onClick={() =>
|
||||
navigate({
|
||||
to: '/tasks/detail/$id',
|
||||
params: {
|
||||
id: task.id,
|
||||
},
|
||||
})
|
||||
}
|
||||
>
|
||||
<div className="flex flex-col space-y-2">
|
||||
<div className="flex items-center justify-between">
|
||||
<Label className="font-medium text-sm capitalize">
|
||||
<span>{prettyTaskType(task.taskType)} Task</span>
|
||||
</Label>
|
||||
</div>
|
||||
|
||||
<code className="break-all rounded bg-muted px-2 py-1 font-mono text-xs">
|
||||
{task.id}
|
||||
</code>
|
||||
|
||||
<div className="text-muted-foreground text-xs">
|
||||
{task.status}
|
||||
</div>
|
||||
</div>
|
||||
</Card>
|
||||
))
|
||||
) : (
|
||||
<div className="col-span-full py-8 text-center text-muted-foreground">
|
||||
No associated tasks now
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{subscription.bangumi?.nodes &&
|
||||
subscription.bangumi.nodes.length > 0 && (
|
||||
<>
|
||||
@ -465,6 +512,7 @@ function SubscriptionDetailRouteComponent() {
|
||||
src={`/api/static${bangumi.posterLink}`}
|
||||
alt="Poster"
|
||||
className="h-full w-full object-cover"
|
||||
loading="lazy"
|
||||
/>
|
||||
)}
|
||||
</div>
|
||||
|
@ -0,0 +1,3 @@
|
||||
export function prettyTaskType(taskType: string) {
|
||||
return taskType.replace(/_/g, ' ');
|
||||
}
|
@ -33,6 +33,7 @@ import {
|
||||
import { format } from 'date-fns';
|
||||
import { ArrowLeft, RefreshCw } from 'lucide-react';
|
||||
import { toast } from 'sonner';
|
||||
import { prettyTaskType } from './-pretty-task-type';
|
||||
import { getStatusBadge } from './-status-badge';
|
||||
|
||||
export const Route = createFileRoute('/_app/tasks/detail/$id')({
|
||||
@ -182,7 +183,9 @@ function TaskDetailRouteComponent() {
|
||||
<div className="space-y-2">
|
||||
<Label className="font-medium text-sm">Task Type</Label>
|
||||
<div className="rounded-md bg-muted p-3">
|
||||
<Badge variant="secondary">{task.taskType}</Badge>
|
||||
<Badge variant="secondary" className="capitalize">
|
||||
{prettyTaskType(task.taskType)}
|
||||
</Badge>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
|
@ -42,6 +42,7 @@ import {
|
||||
} from '@/infra/errors/apollo';
|
||||
import { useMemo, useState } from 'react';
|
||||
import { toast } from 'sonner';
|
||||
import { prettyTaskType } from './-pretty-task-type';
|
||||
import { getStatusBadge } from './-status-badge';
|
||||
|
||||
export const Route = createFileRoute('/_app/tasks/manage')({
|
||||
@ -202,7 +203,9 @@ function TaskManageRouteComponent() {
|
||||
# {task.id}
|
||||
</div>
|
||||
<div className="flex gap-2">
|
||||
<Badge variant="outline">{task.taskType}</Badge>
|
||||
<Badge variant="outline" className="capitalize">
|
||||
{prettyTaskType(task.taskType)}
|
||||
</Badge>
|
||||
</div>
|
||||
</div>
|
||||
<div className="mt-1 flex items-center gap-2">
|
||||
|
10
justfile
10
justfile
@ -4,7 +4,7 @@ set dotenv-load := true
|
||||
prepare-dev:
|
||||
cargo install cargo-binstall
|
||||
cargo binstall sea-orm-cli cargo-llvm-cov cargo-nextest
|
||||
# <package-manager> install watchexec just zellij nasm libjxl
|
||||
# <package-manager> install watchexec just zellij nasm libjxl netcat
|
||||
|
||||
prepare-dev-testcontainers:
|
||||
docker pull linuxserver/qbittorrent:latest
|
||||
@ -17,6 +17,11 @@ dev-optimize-images:
|
||||
dev-webui:
|
||||
pnpm run --filter=webui dev
|
||||
|
||||
prod-webui:
|
||||
pnpm run --filter=webui build
|
||||
mkdir -p apps/recorder/webui
|
||||
cp -r apps/webui/dist/* apps/recorder/webui/
|
||||
|
||||
dev-proxy:
|
||||
npx --yes kill-port --port 8899,5005
|
||||
pnpm run --parallel --filter=proxy dev
|
||||
@ -24,6 +29,9 @@ dev-proxy:
|
||||
dev-recorder:
|
||||
watchexec -r -e rs,toml,yaml,json,env -- cargo run -p recorder --bin recorder_cli -- --environment=development --graceful-shutdown=false
|
||||
|
||||
prod-recorder: prod-webui
|
||||
cargo run --release -p recorder --bin recorder_cli -- --environment=production --working-dir=apps/recorder --graceful-shutdown=false
|
||||
|
||||
dev-recorder-migrate-down:
|
||||
cargo run -p recorder --bin migrate_down -- --environment development
|
||||
|
||||
|
@ -176,7 +176,7 @@ impl HttpClient {
|
||||
let accept_invalid_certs = proxy
|
||||
.accept_invalid_certs
|
||||
.as_ref()
|
||||
.map(|b| b.as_bool())
|
||||
.map(|b| *b)
|
||||
.unwrap_or_default();
|
||||
let proxy = proxy.clone().into_proxy()?;
|
||||
if let Some(proxy) = proxy {
|
||||
@ -307,7 +307,7 @@ impl HttpClient {
|
||||
let accept_invalid_certs = proxy
|
||||
.accept_invalid_certs
|
||||
.as_ref()
|
||||
.map(|b| b.as_bool())
|
||||
.map(|b| *b)
|
||||
.unwrap_or_default();
|
||||
let proxy = proxy.clone().into_proxy().unwrap_or_default();
|
||||
if let Some(proxy) = proxy {
|
||||
|
@ -2,22 +2,24 @@ use axum::http::{HeaderMap, HeaderValue};
|
||||
use reqwest::{NoProxy, Proxy};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{NoneAsEmptyString, serde_as};
|
||||
use util::BooleanLike;
|
||||
|
||||
use crate::HttpClientError;
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct HttpClientProxyConfig {
|
||||
#[serde(default)]
|
||||
#[serde_as(as = "NoneAsEmptyString")]
|
||||
pub server: Option<String>,
|
||||
#[serde(default)]
|
||||
#[serde_as(as = "NoneAsEmptyString")]
|
||||
pub auth_header: Option<String>,
|
||||
#[serde(with = "http_serde::option::header_map")]
|
||||
pub headers: Option<HeaderMap>,
|
||||
#[serde(default)]
|
||||
#[serde_as(as = "NoneAsEmptyString")]
|
||||
pub no_proxy: Option<String>,
|
||||
pub accept_invalid_certs: Option<BooleanLike>,
|
||||
pub accept_invalid_certs: Option<bool>,
|
||||
}
|
||||
|
||||
impl HttpClientProxyConfig {
|
||||
|
@ -1,5 +1,3 @@
|
||||
pub mod errors;
|
||||
pub mod loose;
|
||||
|
||||
pub use errors::OptDynErr;
|
||||
pub use loose::BooleanLike;
|
||||
|
@ -1,19 +0,0 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum BooleanLike {
|
||||
Boolean(bool),
|
||||
String(String),
|
||||
Number(i32),
|
||||
}
|
||||
|
||||
impl BooleanLike {
|
||||
pub fn as_bool(&self) -> bool {
|
||||
match self {
|
||||
BooleanLike::Boolean(b) => *b,
|
||||
BooleanLike::String(s) => s.to_lowercase() == "true",
|
||||
BooleanLike::Number(n) => *n != 0,
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user