feat: support system tasks

This commit is contained in:
master 2025-07-03 03:48:23 +08:00
parent 5b001f9584
commit 1d0aa8d7f1
44 changed files with 1833 additions and 595 deletions

View File

@ -40,9 +40,5 @@
} }
], ],
"rust-analyzer.cargo.features": "all", "rust-analyzer.cargo.features": "all",
"rust-analyzer.testExplorer": true, "rust-analyzer.testExplorer": true
// https://github.com/rust-lang/rust/issues/141540
"rust-analyzer.runnables.extraEnv": {
"CARGO_INCREMENTAL": "0",
}
} }

130
Cargo.lock generated
View File

@ -404,7 +404,7 @@ dependencies = [
"futures-util", "futures-util",
"handlebars", "handlebars",
"http", "http",
"indexmap 2.9.0", "indexmap 2.10.0",
"lru", "lru",
"mime", "mime",
"multer", "multer",
@ -474,7 +474,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34ecdaff7c9cffa3614a9f9999bf9ee4c3078fe3ce4d6a6e161736b56febf2de" checksum = "34ecdaff7c9cffa3614a9f9999bf9ee4c3078fe3ce4d6a6e161736b56febf2de"
dependencies = [ dependencies = [
"bytes", "bytes",
"indexmap 2.9.0", "indexmap 2.10.0",
"serde", "serde",
"serde_json", "serde_json",
] ]
@ -592,9 +592,9 @@ dependencies = [
[[package]] [[package]]
name = "avif-serialize" name = "avif-serialize"
version = "0.8.3" version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98922d6a4cfbcb08820c69d8eeccc05bb1f29bfa06b4f5b1dbfe9a868bd7608e" checksum = "19135c0c7a60bfee564dbe44ab5ce0557c6bf3884e5291a50be76a15640c4fbd"
dependencies = [ dependencies = [
"arrayvec", "arrayvec",
] ]
@ -1009,9 +1009,9 @@ checksum = "56ed6191a7e78c36abdb16ab65341eefd73d64d303fffccdbb00d51e4205967b"
[[package]] [[package]]
name = "bumpalo" name = "bumpalo"
version = "3.18.1" version = "3.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "793db76d6187cd04dff33004d8e6c9cc4e05cd330500379d2394209271b4aeee" checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43"
[[package]] [[package]]
name = "bytecheck" name = "bytecheck"
@ -1672,9 +1672,9 @@ dependencies = [
[[package]] [[package]]
name = "crunchy" name = "crunchy"
version = "0.2.3" version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43da5946c66ffcc7745f48db692ffbb10a83bfe0afd96235c5c2a4fb23994929" checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5"
[[package]] [[package]]
name = "crypto-bigint" name = "crypto-bigint"
@ -2781,9 +2781,9 @@ dependencies = [
[[package]] [[package]]
name = "gif" name = "gif"
version = "0.13.2" version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcc37f9a2bfe731e69f1e08d29d91d30604b9ce24bcb2880a961e82d89c6ed89" checksum = "4ae047235e33e2829703574b54fdec96bfbad892062d97fed2f76022287de61b"
dependencies = [ dependencies = [
"color_quant", "color_quant",
"weezl", "weezl",
@ -2873,9 +2873,9 @@ dependencies = [
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.4.10" version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9421a676d1b147b16b82c9225157dc629087ef8ec4d5e2960f9437a90dac0a5" checksum = "17da50a276f1e01e0ba6c029e47b7100754904ee8a278f886546e98575380785"
dependencies = [ dependencies = [
"atomic-waker", "atomic-waker",
"bytes", "bytes",
@ -2883,7 +2883,7 @@ dependencies = [
"futures-core", "futures-core",
"futures-sink", "futures-sink",
"http", "http",
"indexmap 2.9.0", "indexmap 2.10.0",
"slab", "slab",
"tokio", "tokio",
"tokio-util", "tokio-util",
@ -3847,9 +3847,9 @@ dependencies = [
[[package]] [[package]]
name = "indexmap" name = "indexmap"
version = "2.9.0" version = "2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e" checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661"
dependencies = [ dependencies = [
"equivalent", "equivalent",
"hashbrown 0.15.4", "hashbrown 0.15.4",
@ -3967,6 +3967,17 @@ dependencies = [
"smallvec", "smallvec",
] ]
[[package]]
name = "io-uring"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b86e202f00093dcba4275d4636b93ef9dd75d025ae560d2521b45ea28ab49013"
dependencies = [
"bitflags 2.9.1",
"cfg-if",
"libc",
]
[[package]] [[package]]
name = "ipnet" name = "ipnet"
version = "2.11.0" version = "2.11.0"
@ -4174,9 +4185,9 @@ checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de"
[[package]] [[package]]
name = "libredox" name = "libredox"
version = "0.1.3" version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" checksum = "1580801010e535496706ba011c15f8532df6b42297d2e471fec38ceadd8c0638"
dependencies = [ dependencies = [
"bitflags 2.9.1", "bitflags 2.9.1",
"libc", "libc",
@ -4308,7 +4319,7 @@ dependencies = [
"dashmap 6.1.0", "dashmap 6.1.0",
"futures", "futures",
"hex 0.4.3", "hex 0.4.3",
"indexmap 2.9.0", "indexmap 2.10.0",
"leaky-bucket", "leaky-bucket",
"librqbit-bencode", "librqbit-bencode",
"librqbit-clone-to-owned", "librqbit-clone-to-owned",
@ -4423,9 +4434,9 @@ dependencies = [
[[package]] [[package]]
name = "lightningcss" name = "lightningcss"
version = "1.0.0-alpha.66" version = "1.0.0-alpha.67"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a73ffa17de66534e4b527232f44aa0a89fad22c4f4e0735f9be35494f058e54" checksum = "798fba4e1205eed356b8ed7754cc3f7f04914e27855ca641409f4a532e992149"
dependencies = [ dependencies = [
"ahash 0.8.12", "ahash 0.8.12",
"bitflags 2.9.1", "bitflags 2.9.1",
@ -4435,7 +4446,7 @@ dependencies = [
"dashmap 5.5.3", "dashmap 5.5.3",
"data-encoding", "data-encoding",
"getrandom 0.2.16", "getrandom 0.2.16",
"indexmap 2.9.0", "indexmap 2.10.0",
"itertools 0.10.5", "itertools 0.10.5",
"lazy_static", "lazy_static",
"lightningcss-derive", "lightningcss-derive",
@ -5362,9 +5373,9 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]] [[package]]
name = "owo-colors" name = "owo-colors"
version = "4.2.1" version = "4.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26995317201fa17f3656c36716aed4a7c81743a9634ac4c99c0eeda495db0cec" checksum = "48dd4f4a2c8405440fd0462561f0e5806bd0f77e86f51c761481bdd4018b545e"
[[package]] [[package]]
name = "p256" name = "p256"
@ -5858,7 +5869,7 @@ dependencies = [
"either", "either",
"hashbrown 0.14.5", "hashbrown 0.14.5",
"hashbrown 0.15.4", "hashbrown 0.15.4",
"indexmap 2.9.0", "indexmap 2.10.0",
"itoa", "itoa",
"num-traits", "num-traits",
"polars-arrow", "polars-arrow",
@ -6019,7 +6030,7 @@ dependencies = [
"either", "either",
"hashbrown 0.15.4", "hashbrown 0.15.4",
"hex 0.4.3", "hex 0.4.3",
"indexmap 2.9.0", "indexmap 2.10.0",
"libm", "libm",
"memchr", "memchr",
"num-traits", "num-traits",
@ -6128,7 +6139,7 @@ version = "0.49.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ada7c7e2fbbeffbdd67628cd8a89f02b0a8d21c71d34e297e2463a7c17575203" checksum = "ada7c7e2fbbeffbdd67628cd8a89f02b0a8d21c71d34e297e2463a7c17575203"
dependencies = [ dependencies = [
"indexmap 2.9.0", "indexmap 2.10.0",
"polars-error", "polars-error",
"polars-utils", "polars-utils",
"serde", "serde",
@ -6229,7 +6240,7 @@ dependencies = [
"flate2", "flate2",
"foldhash", "foldhash",
"hashbrown 0.15.4", "hashbrown 0.15.4",
"indexmap 2.9.0", "indexmap 2.10.0",
"libc", "libc",
"memmap2 0.9.5", "memmap2 0.9.5",
"num-traits", "num-traits",
@ -6986,9 +6997,9 @@ dependencies = [
[[package]] [[package]]
name = "reqwest" name = "reqwest"
version = "0.12.20" version = "0.12.22"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eabf4c97d9130e2bf606614eb937e86edac8292eaa6f422f995d7e8de1eb1813" checksum = "cbc931937e6ca3a06e3b6c0aa7841849b160a90351d6ab467a8b9b9959767531"
dependencies = [ dependencies = [
"base64 0.22.1", "base64 0.22.1",
"bytes", "bytes",
@ -7440,6 +7451,18 @@ dependencies = [
"serde_json", "serde_json",
] ]
[[package]]
name = "schemars"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1375ba8ef45a6f15d83fa8748f1079428295d403d6ea991d09ab100155fbc06d"
dependencies = [
"dyn-clone",
"ref-cast",
"serde",
"serde_json",
]
[[package]] [[package]]
name = "scoped-tls" name = "scoped-tls"
version = "1.0.1" version = "1.0.1"
@ -7488,9 +7511,9 @@ dependencies = [
[[package]] [[package]]
name = "sea-orm" name = "sea-orm"
version = "1.1.12" version = "1.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18b7272b88bd608cd846de24f41b74a0315a135fe761b0aed4ec1ce6a6327a93" checksum = "560ea59f07472886a236e7919b9425cf16914fee1d663d3c32f1af2e922b83f0"
dependencies = [ dependencies = [
"async-stream", "async-stream",
"async-trait", "async-trait",
@ -7517,9 +7540,9 @@ dependencies = [
[[package]] [[package]]
name = "sea-orm-cli" name = "sea-orm-cli"
version = "1.1.12" version = "1.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a4961b0d9098a9dc992d6e75fb761f9e5c442bb46746eeffa08e47b53759fce" checksum = "00dd755ba3faca11692d8aaca46b68f1b4955c5dfdd6a3f1f9fba3a679a3ec1d"
dependencies = [ dependencies = [
"chrono", "chrono",
"clap", "clap",
@ -7535,9 +7558,9 @@ dependencies = [
[[package]] [[package]]
name = "sea-orm-macros" name = "sea-orm-macros"
version = "1.1.12" version = "1.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c38255a6b2e6d1ae2d5df35696507a345f03c036ae32caeb0a3b922dbab610d" checksum = "70d0ea50bb4317c8a58ed34dc410a79d685128e7b77ddcd9e8b59ae6416a56d9"
dependencies = [ dependencies = [
"heck 0.5.0", "heck 0.5.0",
"proc-macro-crate", "proc-macro-crate",
@ -7550,9 +7573,9 @@ dependencies = [
[[package]] [[package]]
name = "sea-orm-migration" name = "sea-orm-migration"
version = "1.1.12" version = "1.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82f58c3b1dcf6c137f08394f0228f9baf1574a2a799e93dc5da3cd9228bef9c5" checksum = "3e06e0f3ca090091ad58da2bc02cdb63f9afbd276baf029f065f6ff09e79cbe9"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"clap", "clap",
@ -7846,16 +7869,17 @@ dependencies = [
[[package]] [[package]]
name = "serde_with" name = "serde_with"
version = "3.13.0" version = "3.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf65a400f8f66fb7b0552869ad70157166676db75ed8181f8104ea91cf9d0b42" checksum = "f2c45cd61fefa9db6f254525d46e392b852e0e61d9a1fd36e5bd183450a556d5"
dependencies = [ dependencies = [
"base64 0.22.1", "base64 0.22.1",
"chrono", "chrono",
"hex 0.4.3", "hex 0.4.3",
"indexmap 1.9.3", "indexmap 1.9.3",
"indexmap 2.9.0", "indexmap 2.10.0",
"schemars", "schemars 0.9.0",
"schemars 1.0.3",
"serde", "serde",
"serde_derive", "serde_derive",
"serde_json", "serde_json",
@ -7865,9 +7889,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_with_macros" name = "serde_with_macros"
version = "3.13.0" version = "3.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81679d9ed988d5e9a5e6531dc3f2c28efbd639cbd1dfb628df08edea6004da77" checksum = "de90945e6565ce0d9a25098082ed4ee4002e047cb59892c318d66821e14bb30f"
dependencies = [ dependencies = [
"darling", "darling",
"proc-macro2", "proc-macro2",
@ -7881,7 +7905,7 @@ version = "0.9.34+deprecated"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47"
dependencies = [ dependencies = [
"indexmap 2.9.0", "indexmap 2.10.0",
"itoa", "itoa",
"ryu", "ryu",
"serde", "serde",
@ -8227,7 +8251,7 @@ dependencies = [
"futures-util", "futures-util",
"hashbrown 0.15.4", "hashbrown 0.15.4",
"hashlink", "hashlink",
"indexmap 2.9.0", "indexmap 2.10.0",
"log", "log",
"memchr", "memchr",
"once_cell", "once_cell",
@ -8892,17 +8916,19 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.45.1" version = "1.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779" checksum = "1140bb80481756a8cbe10541f37433b459c5aa1e727b4c020fbfebdc25bf3ec4"
dependencies = [ dependencies = [
"backtrace", "backtrace",
"bytes", "bytes",
"io-uring",
"libc", "libc",
"mio 1.0.4", "mio 1.0.4",
"parking_lot 0.12.4", "parking_lot 0.12.4",
"pin-project-lite", "pin-project-lite",
"signal-hook-registry", "signal-hook-registry",
"slab",
"socket2", "socket2",
"tokio-macros", "tokio-macros",
"windows-sys 0.52.0", "windows-sys 0.52.0",
@ -9040,7 +9066,7 @@ version = "0.22.27"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a"
dependencies = [ dependencies = [
"indexmap 2.9.0", "indexmap 2.10.0",
"serde", "serde",
"serde_spanned", "serde_spanned",
"toml_datetime", "toml_datetime",
@ -9924,9 +9950,9 @@ dependencies = [
[[package]] [[package]]
name = "windows-registry" name = "windows-registry"
version = "0.5.2" version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3bab093bdd303a1240bb99b8aba8ea8a69ee19d34c9e2ef9594e708a4878820" checksum = "5b8a9ed28765efc97bbc954883f4e6796c33a06546ebafacbabee9696967499e"
dependencies = [ dependencies = [
"windows-link", "windows-link",
"windows-result", "windows-result",
@ -10225,9 +10251,9 @@ dependencies = [
[[package]] [[package]]
name = "xattr" name = "xattr"
version = "1.5.0" version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d65cbf2f12c15564212d48f4e3dfb87923d25d611f2aed18f4cb23f0413d89e" checksum = "af3a19837351dc82ba89f8a125e22a3c475f05aba604acc023d62b2739ae2909"
dependencies = [ dependencies = [
"libc", "libc",
"rustix 1.0.7", "rustix 1.0.7",

View File

@ -13,7 +13,7 @@ name = "mikan_doppel"
path = "src/bin/mikan_doppel.rs" path = "src/bin/mikan_doppel.rs"
[dependencies] [dependencies]
recorder = { workspace = true } recorder = { workspace = true, features = ["playground"] }
tokio = { workspace = true } tokio = { workspace = true }
tracing-subscriber = { workspace = true } tracing-subscriber = { workspace = true }
tracing = { workspace = true } tracing = { workspace = true }

View File

@ -6,7 +6,7 @@ edition = "2024"
[features] [features]
default = ["jxl"] default = ["jxl"]
playground = ["dep:inquire", "dep:color-eyre", "dep:polars"] playground = ["dep:inquire", "dep:color-eyre", "dep:polars", "test-utils"]
testcontainers = [ testcontainers = [
"dep:testcontainers", "dep:testcontainers",
"dep:testcontainers-modules", "dep:testcontainers-modules",
@ -15,6 +15,7 @@ testcontainers = [
"testcontainers-modules/postgres", "testcontainers-modules/postgres",
] ]
jxl = ["dep:jpegxl-rs", "dep:jpegxl-sys"] jxl = ["dep:jpegxl-rs", "dep:jpegxl-sys"]
test-utils = []
[lib] [lib]
name = "recorder" name = "recorder"

View File

@ -131,11 +131,12 @@ impl AppBuilder {
} }
pub fn working_dir_from_manifest_dir(self) -> Self { pub fn working_dir_from_manifest_dir(self) -> Self {
let manifest_dir = if cfg!(debug_assertions) || cfg!(test) || cfg!(feature = "playground") { #[cfg(any(test, debug_assertions, feature = "test-utils"))]
env!("CARGO_MANIFEST_DIR") let manifest_dir = env!("CARGO_MANIFEST_DIR");
} else {
"./apps/recorder" #[cfg(not(any(test, debug_assertions, feature = "test-utils")))]
}; let manifest_dir = "./apps/recorder";
self.working_dir(manifest_dir.to_string()) self.working_dir(manifest_dir.to_string())
} }
} }

View File

@ -546,14 +546,12 @@ impl MikanBangumiSubscription {
#[cfg(test)] #[cfg(test)]
#[allow(unused_variables)] #[allow(unused_variables)]
mod tests { mod tests {
use std::sync::Arc;
use rstest::{fixture, rstest}; use rstest::{fixture, rstest};
use sea_orm::{ActiveModelTrait, ActiveValue, EntityTrait}; use sea_orm::{ActiveModelTrait, ActiveValue, EntityTrait};
use tracing::Level; use tracing::Level;
use crate::{ use crate::{
app::AppContextTrait,
errors::RecorderResult, errors::RecorderResult,
extract::mikan::{ extract::mikan::{
MikanBangumiHash, MikanSeasonFlowUrlMeta, MikanSeasonStr, MikanBangumiHash, MikanSeasonFlowUrlMeta, MikanSeasonStr,
@ -564,34 +562,11 @@ mod tests {
subscriptions::{self, SubscriptionTrait}, subscriptions::{self, SubscriptionTrait},
}, },
test_utils::{ test_utils::{
app::{TestingAppContext, TestingAppContextPreset}, app::TestingPreset, mikan::build_testing_mikan_credential_form,
mikan::{MikanMockServer, build_testing_mikan_credential_form},
tracing::try_init_testing_tracing, tracing::try_init_testing_tracing,
}, },
}; };
struct TestingResources {
pub app_ctx: Arc<dyn AppContextTrait>,
pub mikan_server: MikanMockServer,
}
async fn build_testing_app_context() -> RecorderResult<TestingResources> {
let mikan_server = MikanMockServer::new().await?;
let mikan_base_url = mikan_server.base_url().clone();
let app_ctx = TestingAppContext::from_preset(TestingAppContextPreset {
mikan_base_url: mikan_base_url.to_string(),
database_config: None,
})
.await?;
Ok(TestingResources {
app_ctx,
mikan_server,
})
}
#[fixture] #[fixture]
fn before_each() { fn before_each() {
try_init_testing_tracing(Level::DEBUG); try_init_testing_tracing(Level::DEBUG);
@ -600,10 +575,10 @@ mod tests {
#[rstest] #[rstest]
#[tokio::test] #[tokio::test]
async fn test_mikan_season_subscription_sync_feeds(before_each: ()) -> RecorderResult<()> { async fn test_mikan_season_subscription_sync_feeds(before_each: ()) -> RecorderResult<()> {
let TestingResources { let mut preset = TestingPreset::default().await?;
app_ctx, let app_ctx = preset.app_ctx.clone();
mut mikan_server,
} = build_testing_app_context().await?; let mikan_server = &mut preset.mikan_server;
let _resources_mock = mikan_server.mock_resources_with_doppel(); let _resources_mock = mikan_server.mock_resources_with_doppel();
@ -662,10 +637,11 @@ mod tests {
#[rstest] #[rstest]
#[tokio::test] #[tokio::test]
async fn test_mikan_subscriber_subscription_sync_feeds(before_each: ()) -> RecorderResult<()> { async fn test_mikan_subscriber_subscription_sync_feeds(before_each: ()) -> RecorderResult<()> {
let TestingResources { let mut preset = TestingPreset::default().await?;
app_ctx,
mut mikan_server, let app_ctx = preset.app_ctx.clone();
} = build_testing_app_context().await?;
let mikan_server = &mut preset.mikan_server;
let _resources_mock = mikan_server.mock_resources_with_doppel(); let _resources_mock = mikan_server.mock_resources_with_doppel();
@ -729,10 +705,11 @@ mod tests {
#[rstest] #[rstest]
#[tokio::test] #[tokio::test]
async fn test_mikan_bangumi_subscription_sync_feeds(before_each: ()) -> RecorderResult<()> { async fn test_mikan_bangumi_subscription_sync_feeds(before_each: ()) -> RecorderResult<()> {
let TestingResources { let mut preset = TestingPreset::default().await?;
app_ctx,
mut mikan_server, let app_ctx = preset.app_ctx.clone();
} = build_testing_app_context().await?;
let mikan_server = &mut preset.mikan_server;
let _resources_mock = mikan_server.mock_resources_with_doppel(); let _resources_mock = mikan_server.mock_resources_with_doppel();

View File

@ -35,7 +35,7 @@ use crate::{
EncodeWebpOptions, EncodeWebpOptions,
}, },
storage::StorageContentCategory, storage::StorageContentCategory,
task::{OptimizeImageTask, SystemTask}, task::OptimizeImageTask,
}; };
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@ -818,11 +818,14 @@ pub async fn scrape_mikan_poster_meta_from_image_url(
let webp_storage_path = storage_path.with_extension("webp"); let webp_storage_path = storage_path.with_extension("webp");
if storage_service.exists(&webp_storage_path).await?.is_none() { if storage_service.exists(&webp_storage_path).await?.is_none() {
task_service task_service
.add_system_task(SystemTask::OptimizeImage(OptimizeImageTask { .add_system_task(
source_path: storage_path.clone().to_string(), OptimizeImageTask::builder()
target_path: webp_storage_path.to_string(), .source_path(storage_path.clone().to_string())
format_options: EncodeImageOptions::Webp(EncodeWebpOptions::default()), .target_path(webp_storage_path.to_string())
})) .format_options(EncodeImageOptions::Webp(EncodeWebpOptions::default()))
.build()
.into(),
)
.await?; .await?;
} }
} }
@ -830,11 +833,14 @@ pub async fn scrape_mikan_poster_meta_from_image_url(
let avif_storage_path = storage_path.with_extension("avif"); let avif_storage_path = storage_path.with_extension("avif");
if storage_service.exists(&avif_storage_path).await?.is_none() { if storage_service.exists(&avif_storage_path).await?.is_none() {
task_service task_service
.add_system_task(SystemTask::OptimizeImage(OptimizeImageTask { .add_system_task(
source_path: storage_path.clone().to_string(), OptimizeImageTask::builder()
target_path: avif_storage_path.to_string(), .source_path(storage_path.clone().to_string())
format_options: EncodeImageOptions::Avif(EncodeAvifOptions::default()), .target_path(avif_storage_path.to_string())
})) .format_options(EncodeImageOptions::Avif(EncodeAvifOptions::default()))
.build()
.into(),
)
.await?; .await?;
} }
} }
@ -842,11 +848,14 @@ pub async fn scrape_mikan_poster_meta_from_image_url(
let jxl_storage_path = storage_path.with_extension("jxl"); let jxl_storage_path = storage_path.with_extension("jxl");
if storage_service.exists(&jxl_storage_path).await?.is_none() { if storage_service.exists(&jxl_storage_path).await?.is_none() {
task_service task_service
.add_system_task(SystemTask::OptimizeImage(OptimizeImageTask { .add_system_task(
source_path: storage_path.clone().to_string(), OptimizeImageTask::builder()
target_path: jxl_storage_path.to_string(), .source_path(storage_path.clone().to_string())
format_options: EncodeImageOptions::Jxl(EncodeJxlOptions::default()), .target_path(jxl_storage_path.to_string())
})) .format_options(EncodeImageOptions::Jxl(EncodeJxlOptions::default()))
.build()
.into(),
)
.await?; .await?;
} }
} }
@ -1089,7 +1098,7 @@ mod test {
use super::*; use super::*;
use crate::test_utils::{ use crate::test_utils::{
app::{TestingAppContext, TestingAppContextPreset}, app::{TestingAppContext, TestingPreset},
crypto::build_testing_crypto_service, crypto::build_testing_crypto_service,
database::build_testing_database_service, database::build_testing_database_service,
mikan::{ mikan::{
@ -1137,17 +1146,13 @@ mod test {
#[rstest] #[rstest]
#[tokio::test] #[tokio::test]
async fn test_scrape_mikan_poster_meta_from_image_url(before_each: ()) -> RecorderResult<()> { async fn test_scrape_mikan_poster_meta_from_image_url(before_each: ()) -> RecorderResult<()> {
let mut mikan_server = MikanMockServer::new().await?; let mut preset = TestingPreset::default().await?;
let mikan_base_url = mikan_server.base_url().clone(); let app_ctx = preset.app_ctx.clone();
let app_ctx = TestingAppContext::from_preset(TestingAppContextPreset { let mikan_base_url = preset.mikan_server.base_url().clone();
mikan_base_url: mikan_base_url.to_string(),
database_config: None,
})
.await?;
let resources_mock = mikan_server.mock_resources_with_doppel(); let resources_mock = preset.mikan_server.mock_resources_with_doppel();
let bangumi_poster_url = mikan_base_url.join("/images/Bangumi/202309/5ce9fed1.jpg")?; let bangumi_poster_url = mikan_base_url.join("/images/Bangumi/202309/5ce9fed1.jpg")?;

View File

@ -6,6 +6,7 @@ use crate::{
domains::{ domains::{
subscriber_tasks::restrict_subscriber_tasks_for_entity, subscriber_tasks::restrict_subscriber_tasks_for_entity,
subscribers::restrict_subscriber_for_entity, subscribers::restrict_subscriber_for_entity,
system_tasks::restrict_system_tasks_for_entity,
}, },
infra::{custom::register_entity_default_writable, name::get_entity_and_column_name}, infra::{custom::register_entity_default_writable, name::get_entity_and_column_name},
}, },
@ -17,6 +18,7 @@ fn skip_columns_for_entity_input(context: &mut BuilderContext) {
if matches!( if matches!(
column, column,
cron::Column::SubscriberTask cron::Column::SubscriberTask
| cron::Column::SystemTask
| cron::Column::CronExpr | cron::Column::CronExpr
| cron::Column::Enabled | cron::Column::Enabled
| cron::Column::TimeoutMs | cron::Column::TimeoutMs
@ -44,6 +46,7 @@ pub fn register_cron_to_schema_context(context: &mut BuilderContext) {
restrict_subscriber_for_entity::<cron::Entity>(context, &cron::Column::SubscriberId); restrict_subscriber_for_entity::<cron::Entity>(context, &cron::Column::SubscriberId);
restrict_subscriber_tasks_for_entity::<cron::Entity>(context, &cron::Column::SubscriberTask); restrict_subscriber_tasks_for_entity::<cron::Entity>(context, &cron::Column::SubscriberTask);
restrict_system_tasks_for_entity::<cron::Entity>(context, &cron::Column::SystemTask);
skip_columns_for_entity_input(context); skip_columns_for_entity_input(context);
} }

View File

@ -1,6 +1,7 @@
pub mod credential_3rd; pub mod credential_3rd;
pub mod bangumi; pub mod bangumi;
pub mod cron;
pub mod downloaders; pub mod downloaders;
pub mod downloads; pub mod downloads;
pub mod episodes; pub mod episodes;
@ -10,4 +11,4 @@ pub mod subscribers;
pub mod subscription_bangumi; pub mod subscription_bangumi;
pub mod subscription_episode; pub mod subscription_episode;
pub mod subscriptions; pub mod subscriptions;
pub mod cron; pub mod system_tasks;

View File

@ -30,8 +30,9 @@ use crate::{
}, },
}, },
}, },
migrations::defs::{ApalisJobs, ApalisSchema},
models::subscriber_tasks, models::subscriber_tasks,
task::{ApalisJobs, ApalisSchema, SubscriberTaskTrait}, task::SubscriberTaskTrait,
}; };
fn skip_columns_for_entity_input(context: &mut BuilderContext) { fn skip_columns_for_entity_input(context: &mut BuilderContext) {

View File

@ -0,0 +1,248 @@
use std::{ops::Deref, sync::Arc};
use async_graphql::dynamic::{FieldValue, Scalar, TypeRef};
use convert_case::Case;
use sea_orm::{
ActiveModelBehavior, ColumnTrait, ConnectionTrait, EntityTrait, Iterable, QueryFilter,
QuerySelect, QueryTrait, prelude::Expr, sea_query::Query,
};
use seaography::{
Builder as SeaographyBuilder, BuilderContext, SeaographyError, prepare_active_model,
};
use ts_rs::TS;
use crate::{
auth::AuthUserInfo,
errors::RecorderError,
graphql::{
domains::subscribers::restrict_subscriber_for_entity,
infra::{
custom::{
generate_entity_create_one_mutation_field,
generate_entity_default_basic_entity_object,
generate_entity_default_insert_input_object, generate_entity_delete_mutation_field,
generate_entity_filtered_mutation_field, register_entity_default_readonly,
},
json::{convert_jsonb_output_for_entity, restrict_jsonb_filter_input_for_entity},
name::{
get_entity_and_column_name, get_entity_basic_type_name,
get_entity_custom_mutation_field_name,
},
},
},
migrations::defs::{ApalisJobs, ApalisSchema},
models::system_tasks,
task::SystemTaskTrait,
};
fn skip_columns_for_entity_input(context: &mut BuilderContext) {
for column in system_tasks::Column::iter() {
if matches!(
column,
system_tasks::Column::Job | system_tasks::Column::SubscriberId
) {
continue;
}
let entity_column_key =
get_entity_and_column_name::<system_tasks::Entity>(context, &column);
context.entity_input.insert_skips.push(entity_column_key);
}
}
pub fn restrict_system_tasks_for_entity<T>(context: &mut BuilderContext, column: &T::Column)
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let entity_and_column = get_entity_and_column_name::<T>(context, column);
restrict_jsonb_filter_input_for_entity::<T>(context, column);
convert_jsonb_output_for_entity::<T>(context, column, Some(Case::Camel));
let entity_column_name = get_entity_and_column_name::<T>(context, column);
context.types.input_type_overwrites.insert(
entity_column_name.clone(),
TypeRef::Named(system_tasks::SystemTask::ident().into()),
);
context.types.output_type_overwrites.insert(
entity_column_name.clone(),
TypeRef::Named(system_tasks::SystemTask::ident().into()),
);
context.types.input_conversions.insert(
entity_column_name.clone(),
Box::new(move |resolve_context, value_accessor| {
let task: system_tasks::SystemTaskInput = value_accessor.deserialize()?;
let subscriber_id = resolve_context
.data::<AuthUserInfo>()?
.subscriber_auth
.subscriber_id;
let task = system_tasks::SystemTask::from_input(task, Some(subscriber_id));
let json_value = serde_json::to_value(task).map_err(|err| {
SeaographyError::TypeConversionError(
err.to_string(),
format!("Json - {entity_column_name}"),
)
})?;
Ok(sea_orm::Value::Json(Some(Box::new(json_value))))
}),
);
context.entity_input.update_skips.push(entity_and_column);
}
pub fn register_system_tasks_to_schema_context(context: &mut BuilderContext) {
restrict_subscriber_for_entity::<system_tasks::Entity>(
context,
&system_tasks::Column::SubscriberId,
);
restrict_system_tasks_for_entity::<system_tasks::Entity>(context, &system_tasks::Column::Job);
skip_columns_for_entity_input(context);
}
pub fn register_system_tasks_to_schema_builder(
mut builder: SeaographyBuilder,
) -> SeaographyBuilder {
builder.schema = builder.schema.register(
Scalar::new(system_tasks::SystemTask::ident())
.description(system_tasks::SystemTask::decl()),
);
builder.register_enumeration::<system_tasks::SystemTaskType>();
builder = register_entity_default_readonly!(builder, system_tasks);
let builder_context = builder.context;
{
builder
.outputs
.push(generate_entity_default_basic_entity_object::<
system_tasks::Entity,
>(builder_context));
}
{
let delete_mutation = generate_entity_delete_mutation_field::<system_tasks::Entity>(
builder_context,
Arc::new(|_resolver_ctx, app_ctx, filters| {
Box::pin(async move {
let db = app_ctx.db();
let select_subquery = system_tasks::Entity::find()
.select_only()
.column(system_tasks::Column::Id)
.filter(filters);
let delete_query = Query::delete()
.from_table((ApalisSchema::Schema, ApalisJobs::Table))
.and_where(
Expr::col(ApalisJobs::Id).in_subquery(select_subquery.into_query()),
)
.to_owned();
let db_backend = db.deref().get_database_backend();
let delete_statement = db_backend.build(&delete_query);
let result = db.execute(delete_statement).await?;
Ok::<_, RecorderError>(result.rows_affected())
})
}),
);
builder.mutations.push(delete_mutation);
}
{
let entity_retry_one_mutation_name = get_entity_custom_mutation_field_name::<
system_tasks::Entity,
>(builder_context, "RetryOne");
let retry_one_mutation =
generate_entity_filtered_mutation_field::<system_tasks::Entity, _, _>(
builder_context,
entity_retry_one_mutation_name,
TypeRef::named_nn(get_entity_basic_type_name::<system_tasks::Entity>(
builder_context,
)),
Arc::new(|_resolver_ctx, app_ctx, filters| {
Box::pin(async move {
let db = app_ctx.db();
let job_id = system_tasks::Entity::find()
.filter(filters)
.select_only()
.column(system_tasks::Column::Id)
.into_tuple::<String>()
.one(db)
.await?
.ok_or_else(|| {
RecorderError::from_entity_not_found::<system_tasks::Entity>()
})?;
let task = app_ctx.task();
task.retry_subscriber_task(job_id.clone()).await?;
let task_model = system_tasks::Entity::find()
.filter(system_tasks::Column::Id.eq(&job_id))
.one(db)
.await?
.ok_or_else(|| {
RecorderError::from_entity_not_found::<system_tasks::Entity>()
})?;
Ok::<_, RecorderError>(Some(FieldValue::owned_any(task_model)))
})
}),
);
builder.mutations.push(retry_one_mutation);
}
{
builder
.inputs
.push(generate_entity_default_insert_input_object::<
system_tasks::Entity,
>(builder_context));
let create_one_mutation = generate_entity_create_one_mutation_field::<system_tasks::Entity>(
builder_context,
Arc::new(move |resolver_ctx, app_ctx, input_object| {
Box::pin(async move {
let active_model: Result<system_tasks::ActiveModel, _> =
prepare_active_model(builder_context, &input_object, resolver_ctx);
let task_service = app_ctx.task();
let active_model = active_model?;
let db = app_ctx.db();
let active_model = active_model.before_save(db, true).await?;
let task = active_model.job.unwrap();
let subscriber_id = active_model.subscriber_id.unwrap();
if task.get_subscriber_id() != subscriber_id {
Err(async_graphql::Error::new(
"subscriber_id does not match with job.subscriber_id",
))?;
}
let task_id = task_service.add_system_task(task).await?.to_string();
let db = app_ctx.db();
let task = system_tasks::Entity::find()
.filter(system_tasks::Column::Id.eq(&task_id))
.one(db)
.await?
.ok_or_else(|| {
RecorderError::from_entity_not_found::<system_tasks::Entity>()
})?;
Ok::<_, RecorderError>(task)
})
}),
);
builder.mutations.push(create_one_mutation);
}
builder
}

View File

@ -39,6 +39,9 @@ use crate::{
subscriptions::{ subscriptions::{
register_subscriptions_to_schema_builder, register_subscriptions_to_schema_context, register_subscriptions_to_schema_builder, register_subscriptions_to_schema_context,
}, },
system_tasks::{
register_system_tasks_to_schema_builder, register_system_tasks_to_schema_context,
},
}, },
infra::{ infra::{
json::register_jsonb_input_filter_to_schema_builder, json::register_jsonb_input_filter_to_schema_builder,
@ -79,6 +82,7 @@ pub fn build_schema(
register_subscription_episode_to_schema_context(&mut context); register_subscription_episode_to_schema_context(&mut context);
register_bangumi_to_schema_context(&mut context); register_bangumi_to_schema_context(&mut context);
register_cron_to_schema_context(&mut context); register_cron_to_schema_context(&mut context);
register_system_tasks_to_schema_context(&mut context);
} }
context context
}); });
@ -103,6 +107,7 @@ pub fn build_schema(
builder = register_subscriber_tasks_to_schema_builder(builder); builder = register_subscriber_tasks_to_schema_builder(builder);
builder = register_bangumi_to_schema_builder(builder); builder = register_bangumi_to_schema_builder(builder);
builder = register_cron_to_schema_builder(builder); builder = register_cron_to_schema_builder(builder);
builder = register_system_tasks_to_schema_builder(builder);
} }
let schema = builder.schema_builder(); let schema = builder.schema_builder();

View File

@ -27,6 +27,8 @@ pub mod migrations;
pub mod models; pub mod models;
pub mod storage; pub mod storage;
pub mod task; pub mod task;
pub mod test_utils;
pub mod utils; pub mod utils;
pub mod web; pub mod web;
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;

View File

@ -1,6 +1,8 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use ts_rs::TS;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, TS)]
#[ts(rename_all = "camelCase")]
pub enum AutoOptimizeImageFormat { pub enum AutoOptimizeImageFormat {
#[serde(rename = "image/webp")] #[serde(rename = "image/webp")]
Webp, Webp,
@ -10,25 +12,29 @@ pub enum AutoOptimizeImageFormat {
Jxl, Jxl,
} }
#[derive(Clone, Debug, Serialize, Deserialize, Default)] #[derive(Clone, Debug, Serialize, Deserialize, Default, TS, PartialEq)]
#[ts(rename_all = "camelCase")]
pub struct EncodeWebpOptions { pub struct EncodeWebpOptions {
pub quality: Option<f32>, pub quality: Option<f32>,
} }
#[derive(Clone, Debug, Serialize, Deserialize, Default)] #[derive(Clone, Debug, Serialize, Deserialize, Default, TS, PartialEq)]
#[ts(rename_all = "camelCase")]
pub struct EncodeAvifOptions { pub struct EncodeAvifOptions {
pub quality: Option<u8>, pub quality: Option<u8>,
pub speed: Option<u8>, pub speed: Option<u8>,
pub threads: Option<u8>, pub threads: Option<u8>,
} }
#[derive(Clone, Debug, Serialize, Deserialize, Default)] #[derive(Clone, Debug, Serialize, Deserialize, Default, TS, PartialEq)]
#[ts(rename_all = "camelCase")]
pub struct EncodeJxlOptions { pub struct EncodeJxlOptions {
pub quality: Option<f32>, pub quality: Option<f32>,
pub speed: Option<u8>, pub speed: Option<u8>,
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize, TS, PartialEq)]
#[ts(tag = "mimeType")]
#[serde(tag = "mime_type")] #[serde(tag = "mime_type")]
pub enum EncodeImageOptions { pub enum EncodeImageOptions {
#[serde(rename = "image/webp")] #[serde(rename = "image/webp")]

View File

@ -190,6 +190,37 @@ pub enum Cron {
Priority, Priority,
Status, Status,
SubscriberTask, SubscriberTask,
SystemTask,
}
#[derive(sea_query::Iden)]
pub enum ApalisSchema {
#[iden = "apalis"]
Schema,
}
#[derive(DeriveIden)]
pub enum ApalisJobs {
#[sea_orm(iden = "jobs")]
Table,
SubscriberId,
SubscriptionId,
Job,
JobType,
Status,
TaskType,
Id,
Attempts,
MaxAttempts,
RunAt,
LastError,
LockAt,
LockBy,
DoneAt,
Priority,
CronId,
} }
macro_rules! create_postgres_enum_for_active_enum { macro_rules! create_postgres_enum_for_active_enum {

View File

@ -0,0 +1,219 @@
use async_trait::async_trait;
use sea_orm_migration::{prelude::*, schema::*};
use super::defs::{ApalisJobs, ApalisSchema};
use crate::{
migrations::defs::{Subscribers, Subscriptions},
task::{
SETUP_APALIS_JOBS_EXTRA_FOREIGN_KEYS_FUNCTION_NAME,
SETUP_APALIS_JOBS_EXTRA_FOREIGN_KEYS_TRIGGER_NAME, SUBSCRIBER_TASK_APALIS_NAME,
SYSTEM_TASK_APALIS_NAME,
},
};
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
TableAlterStatement::new()
.table((ApalisSchema::Schema, ApalisJobs::Table))
.add_column_if_not_exists(integer_null(ApalisJobs::SubscriberId))
.add_column_if_not_exists(integer_null(ApalisJobs::SubscriptionId))
.add_column_if_not_exists(string_null(ApalisJobs::TaskType))
.add_foreign_key(
TableForeignKey::new()
.name("fk_apalis_jobs_subscriber_id")
.from_tbl((ApalisSchema::Schema, ApalisJobs::Table))
.from_col(ApalisJobs::SubscriberId)
.to_tbl(Subscribers::Table)
.to_col(Subscribers::Id)
.on_delete(ForeignKeyAction::Cascade)
.on_update(ForeignKeyAction::Restrict),
)
.add_foreign_key(
TableForeignKey::new()
.name("fk_apalis_jobs_subscription_id")
.from_tbl((ApalisSchema::Schema, ApalisJobs::Table))
.from_col(ApalisJobs::SubscriptionId)
.to_tbl(Subscriptions::Table)
.to_col(Subscriptions::Id)
.on_delete(ForeignKeyAction::Cascade)
.on_update(ForeignKeyAction::Restrict),
)
.to_owned(),
)
.await?;
let db = manager.get_connection();
db.execute_unprepared(&format!(
r#"UPDATE {apalis_schema}.{apalis_table} SET {subscriber_id} = ({job} ->> '{subscriber_id}')::integer, {task_type} = ({job} ->> '{task_type}')::text, {subscription_id} = ({job} ->> '{subscription_id}')::integer"#,
apalis_schema = ApalisSchema::Schema.to_string(),
apalis_table = ApalisJobs::Table.to_string(),
subscriber_id = ApalisJobs::SubscriberId.to_string(),
job = ApalisJobs::Job.to_string(),
task_type = ApalisJobs::TaskType.to_string(),
subscription_id = ApalisJobs::SubscriptionId.to_string(),
)).await?;
db.execute_unprepared(&format!(
r#"CREATE OR REPLACE FUNCTION {SETUP_APALIS_JOBS_EXTRA_FOREIGN_KEYS_FUNCTION_NAME}() RETURNS trigger AS $$
DECLARE
new_job_subscriber_id integer;
new_job_subscription_id integer;
new_job_task_type text;
BEGIN
new_job_subscriber_id = (NEW.{job} ->> '{subscriber_id}')::integer;
new_job_subscription_id = (NEW.{job} ->> '{subscription_id}')::integer;
new_job_task_type = (NEW.{job} ->> '{task_type}')::text;
IF new_job_subscriber_id != (OLD.{job} ->> '{subscriber_id}')::integer AND new_job_subscriber_id != NEW.{subscriber_id} THEN
NEW.{subscriber_id} = new_job_subscriber_id;
END IF;
IF new_job_subscription_id != (OLD.{job} ->> '{subscription_id}')::integer AND new_job_subscription_id != NEW.{subscription_id} THEN
NEW.{subscription_id} = new_job_subscription_id;
END IF;
IF new_job_task_type != (OLD.{job} ->> '{task_type}')::text AND new_job_task_type != NEW.{task_type} THEN
NEW.{task_type} = new_job_task_type;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;"#,
job = ApalisJobs::Job.to_string(),
subscriber_id = ApalisJobs::SubscriberId.to_string(),
subscription_id = ApalisJobs::SubscriptionId.to_string(),
task_type = ApalisJobs::TaskType.to_string(),
)).await?;
db.execute_unprepared(&format!(
r#"CREATE OR REPLACE TRIGGER {SETUP_APALIS_JOBS_EXTRA_FOREIGN_KEYS_TRIGGER_NAME}
BEFORE INSERT OR UPDATE ON {apalis_schema}.{apalis_table}
FOR EACH ROW
EXECUTE FUNCTION {SETUP_APALIS_JOBS_EXTRA_FOREIGN_KEYS_FUNCTION_NAME}();"#,
apalis_schema = ApalisSchema::Schema.to_string(),
apalis_table = ApalisJobs::Table.to_string()
))
.await?;
db.execute_unprepared(&format!(
r#"CREATE OR REPLACE VIEW subscriber_tasks AS
SELECT
{job},
{job_type},
{status},
{subscriber_id},
{task_type},
{id},
{attempts},
{max_attempts},
{run_at},
{last_error},
{lock_at},
{lock_by},
{done_at},
{priority},
{subscription_id}
FROM {apalis_schema}.{apalis_table}
WHERE {job_type} = '{SUBSCRIBER_TASK_APALIS_NAME}'
AND jsonb_path_exists({job}, '$.{subscriber_id} ? (@.type() == "number")')
AND jsonb_path_exists({job}, '$.{task_type} ? (@.type() == "string")')"#,
apalis_schema = ApalisSchema::Schema.to_string(),
apalis_table = ApalisJobs::Table.to_string(),
job = ApalisJobs::Job.to_string(),
job_type = ApalisJobs::JobType.to_string(),
status = ApalisJobs::Status.to_string(),
subscriber_id = ApalisJobs::SubscriberId.to_string(),
task_type = ApalisJobs::TaskType.to_string(),
id = ApalisJobs::Id.to_string(),
attempts = ApalisJobs::Attempts.to_string(),
max_attempts = ApalisJobs::MaxAttempts.to_string(),
run_at = ApalisJobs::RunAt.to_string(),
last_error = ApalisJobs::LastError.to_string(),
lock_at = ApalisJobs::LockAt.to_string(),
lock_by = ApalisJobs::LockBy.to_string(),
done_at = ApalisJobs::DoneAt.to_string(),
priority = ApalisJobs::Priority.to_string(),
subscription_id = ApalisJobs::SubscriptionId.to_string(),
))
.await?;
db.execute_unprepared(&format!(
r#"CREATE OR REPLACE VIEW system_tasks AS
SELECT
{job},
{job_type},
{status},
{subscriber_id},
{task_type},
{id},
{attempts},
{max_attempts},
{run_at},
{last_error},
{lock_at},
{lock_by},
{done_at},
{priority}
FROM {apalis_schema}.{apalis_table}
WHERE {job_type} = '{SYSTEM_TASK_APALIS_NAME}'
AND jsonb_path_exists({job}, '$.{task_type} ? (@.type() == "string")')"#,
apalis_schema = ApalisSchema::Schema.to_string(),
apalis_table = ApalisJobs::Table.to_string(),
job = ApalisJobs::Job.to_string(),
job_type = ApalisJobs::JobType.to_string(),
status = ApalisJobs::Status.to_string(),
subscriber_id = ApalisJobs::SubscriberId.to_string(),
task_type = ApalisJobs::TaskType.to_string(),
id = ApalisJobs::Id.to_string(),
attempts = ApalisJobs::Attempts.to_string(),
max_attempts = ApalisJobs::MaxAttempts.to_string(),
run_at = ApalisJobs::RunAt.to_string(),
last_error = ApalisJobs::LastError.to_string(),
lock_at = ApalisJobs::LockAt.to_string(),
lock_by = ApalisJobs::LockBy.to_string(),
done_at = ApalisJobs::DoneAt.to_string(),
priority = ApalisJobs::Priority.to_string(),
))
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let db = manager.get_connection();
db.execute_unprepared("DROP VIEW IF EXISTS subscriber_tasks")
.await?;
db.execute_unprepared("DROP VIEW IF EXISTS system_tasks")
.await?;
db.execute_unprepared(&format!(
r#"DROP TRIGGER IF EXISTS {SETUP_APALIS_JOBS_EXTRA_FOREIGN_KEYS_TRIGGER_NAME} ON {apalis_schema}.{apalis_table}"#,
apalis_schema = ApalisSchema::Schema.to_string(),
apalis_table = ApalisJobs::Table.to_string()
)).await?;
db.execute_unprepared(&format!(
r#"DROP FUNCTION IF EXISTS {SETUP_APALIS_JOBS_EXTRA_FOREIGN_KEYS_FUNCTION_NAME}()"#,
))
.await?;
manager
.alter_table(
TableAlterStatement::new()
.table((ApalisSchema::Schema, ApalisJobs::Table))
.drop_foreign_key("fk_apalis_jobs_subscriber_id")
.drop_foreign_key("fk_apalis_jobs_subscription_id")
.drop_column(ApalisJobs::SubscriberId)
.drop_column(ApalisJobs::SubscriptionId)
.to_owned(),
)
.await?;
Ok(())
}
}

View File

@ -1,64 +0,0 @@
use async_trait::async_trait;
use sea_orm_migration::prelude::*;
use crate::task::SUBSCRIBER_TASK_APALIS_NAME;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let db = manager.get_connection();
db.execute_unprepared(&format!(
r#"CREATE OR REPLACE VIEW subscriber_tasks AS
SELECT
job,
job_type,
status,
(job ->> 'subscriber_id'::text)::integer AS subscriber_id,
job ->> 'task_type'::text AS task_type,
id,
attempts,
max_attempts,
run_at,
last_error,
lock_at,
lock_by,
done_at,
priority
FROM apalis.jobs
WHERE job_type = '{SUBSCRIBER_TASK_APALIS_NAME}'
AND jsonb_path_exists(job, '$.subscriber_id ? (@.type() == "number")')
AND jsonb_path_exists(job, '$.task_type ? (@.type() == "string")')"#,
))
.await?;
db.execute_unprepared(&format!(
r#"CREATE INDEX IF NOT EXISTS idx_apalis_jobs_subscriber_id
ON apalis.jobs (((job -> 'subscriber_id')::integer))
WHERE job_type = '{SUBSCRIBER_TASK_APALIS_NAME}'
AND jsonb_path_exists(job, '$.subscriber_id ? (@.type() == "number")')
AND jsonb_path_exists(job, '$.task_type ? (@.type() == "string")')"#
))
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let db = manager.get_connection();
db.execute_unprepared(
r#"DROP INDEX IF EXISTS idx_apalis_jobs_subscriber_id
ON apalis.jobs"#,
)
.await?;
db.execute_unprepared("DROP VIEW IF EXISTS subscriber_tasks")
.await?;
Ok(())
}
}

View File

@ -1,62 +0,0 @@
use async_trait::async_trait;
use sea_orm_migration::prelude::*;
use crate::task::SUBSCRIBER_TASK_APALIS_NAME;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let db = manager.get_connection();
db.execute_unprepared(&format!(
r#"CREATE OR REPLACE VIEW subscriber_tasks AS
SELECT
job,
job_type,
status,
(job ->> 'subscriber_id')::integer AS subscriber_id,
job ->> 'task_type' AS task_type,
id,
attempts,
max_attempts,
run_at,
last_error,
lock_at,
lock_by,
done_at,
priority,
(job ->> 'subscription_id')::integer AS subscription_id
FROM apalis.jobs
WHERE job_type = '{SUBSCRIBER_TASK_APALIS_NAME}'
AND jsonb_path_exists(job, '$.subscriber_id ? (@.type() == "number")')
AND jsonb_path_exists(job, '$.task_type ? (@.type() == "string")')"#,
))
.await?;
db.execute_unprepared(&format!(
r#"CREATE INDEX IF NOT EXISTS idx_apalis_jobs_subscription_id
ON apalis.jobs (((job -> 'subscription_id')::integer))
WHERE job_type = '{SUBSCRIBER_TASK_APALIS_NAME}'
AND jsonb_path_exists(job, '$.subscription_id ? (@.type() == "number")')
AND jsonb_path_exists(job, '$.task_type ? (@.type() == "string")')"#
))
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let db = manager.get_connection();
db.execute_unprepared(
r#"DROP INDEX IF EXISTS idx_apalis_jobs_subscription_id
ON apalis.jobs"#,
)
.await?;
Ok(())
}
}

View File

@ -4,13 +4,18 @@ use sea_orm_migration::{prelude::*, schema::*};
use crate::{ use crate::{
migrations::defs::{ migrations::defs::{
Cron, CustomSchemaManagerExt, GeneralIds, Subscribers, Subscriptions, table_auto_z, ApalisJobs, ApalisSchema, Cron, CustomSchemaManagerExt, GeneralIds, Subscribers,
Subscriptions, table_auto_z,
}, },
models::cron::{ models::cron::{
CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, CRON_DUE_EVENT, CronStatus, CronStatusEnum, CHECK_AND_TRIGGER_DUE_CRONS_FUNCTION_NAME, CRON_DUE_EVENT, CronStatus, CronStatusEnum,
NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME, NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME, NOTIFY_DUE_CRON_WHEN_MUTATING_FUNCTION_NAME, NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME,
SETUP_CRON_EXTRA_FOREIGN_KEYS_FUNCTION_NAME, SETUP_CRON_EXTRA_FOREIGN_KEYS_TRIGGER_NAME, SETUP_CRON_EXTRA_FOREIGN_KEYS_FUNCTION_NAME, SETUP_CRON_EXTRA_FOREIGN_KEYS_TRIGGER_NAME,
}, },
task::{
SETUP_APALIS_JOBS_EXTRA_FOREIGN_KEYS_FUNCTION_NAME, SUBSCRIBER_TASK_APALIS_NAME,
SYSTEM_TASK_APALIS_NAME,
},
}; };
#[derive(DeriveMigrationName)] #[derive(DeriveMigrationName)]
@ -52,6 +57,7 @@ impl MigrationTrait for Migration {
CronStatus::iden_values(), CronStatus::iden_values(),
)) ))
.col(json_binary_null(Cron::SubscriberTask)) .col(json_binary_null(Cron::SubscriberTask))
.col(json_binary_null(Cron::SystemTask))
.foreign_key( .foreign_key(
ForeignKey::create() ForeignKey::create()
.name("fk_cron_subscriber_id") .name("fk_cron_subscriber_id")
@ -91,12 +97,22 @@ impl MigrationTrait for Migration {
db.execute_unprepared(&format!( db.execute_unprepared(&format!(
r#"CREATE OR REPLACE FUNCTION {SETUP_CRON_EXTRA_FOREIGN_KEYS_FUNCTION_NAME}() RETURNS trigger AS $$ r#"CREATE OR REPLACE FUNCTION {SETUP_CRON_EXTRA_FOREIGN_KEYS_FUNCTION_NAME}() RETURNS trigger AS $$
DECLARE
new_subscriber_task_subscriber_id integer;
new_subscriber_task_subscription_id integer;
new_system_task_subscriber_id integer;
BEGIN BEGIN
IF jsonb_path_exists(NEW.{subscriber_task}, '$.subscriber_id ? (@.type() == "number")') THEN new_subscriber_task_subscriber_id = (NEW.{subscriber_task} ->> 'subscriber_id')::integer;
NEW.{subscriber_id} = (NEW.{subscriber_task} ->> 'subscriber_id')::integer; new_subscriber_task_subscription_id = (NEW.{subscriber_task} ->> 'subscription_id')::integer;
new_system_task_subscriber_id = (NEW.{system_task} ->> 'subscriber_id')::integer;
IF new_subscriber_task_subscriber_id != (OLD.{subscriber_task} ->> 'subscriber_id')::integer AND new_subscriber_task_subscriber_id != NEW.{subscriber_id} THEN
NEW.{subscriber_id} = new_subscriber_task_subscriber_id;
END IF; END IF;
IF jsonb_path_exists(NEW.{subscriber_task}, '$.subscription_id ? (@.type() == "number")') THEN IF new_subscriber_task_subscription_id != (OLD.{subscriber_task} ->> 'subscription_id')::integer AND new_subscriber_task_subscription_id != NEW.{subscription_id} THEN
NEW.{subscription_id} = (NEW.{subscriber_task} ->> 'subscription_id')::integer; NEW.{subscription_id} = new_subscriber_task_subscription_id;
END IF;
IF new_system_task_subscriber_id != (OLD.{system_task} ->> 'subscriber_id')::integer AND new_system_task_subscriber_id != NEW.{subscriber_id} THEN
NEW.{subscriber_id} = new_system_task_subscriber_id;
END IF; END IF;
RETURN NEW; RETURN NEW;
END; END;
@ -104,6 +120,7 @@ impl MigrationTrait for Migration {
subscriber_task = &Cron::SubscriberTask.to_string(), subscriber_task = &Cron::SubscriberTask.to_string(),
subscriber_id = &Cron::SubscriberId.to_string(), subscriber_id = &Cron::SubscriberId.to_string(),
subscription_id = &Cron::SubscriptionId.to_string(), subscription_id = &Cron::SubscriptionId.to_string(),
system_task = &Cron::SystemTask.to_string(),
)).await?; )).await?;
db.execute_unprepared(&format!( db.execute_unprepared(&format!(
@ -208,12 +225,280 @@ impl MigrationTrait for Migration {
)) ))
.await?; .await?;
manager
.alter_table(
TableAlterStatement::new()
.table((ApalisSchema::Schema, ApalisJobs::Table))
.add_column_if_not_exists(integer_null(ApalisJobs::CronId))
.add_foreign_key(
TableForeignKey::new()
.name("fk_apalis_jobs_cron_id")
.from_tbl((ApalisSchema::Schema, ApalisJobs::Table))
.from_col(ApalisJobs::CronId)
.to_tbl(Cron::Table)
.to_col(Cron::Id)
.on_delete(ForeignKeyAction::NoAction)
.on_update(ForeignKeyAction::NoAction),
)
.to_owned(),
)
.await?;
db.execute_unprepared(&format!(
r#"CREATE OR REPLACE VIEW subscriber_tasks AS
SELECT
{job},
{job_type},
{status},
{subscriber_id},
{task_type},
{id},
{attempts},
{max_attempts},
{run_at},
{last_error},
{lock_at},
{lock_by},
{done_at},
{priority},
{subscription_id},
{cron_id}
FROM {apalis_schema}.{apalis_table}
WHERE {job_type} = '{SUBSCRIBER_TASK_APALIS_NAME}'
AND jsonb_path_exists({job}, '$.{subscriber_id} ? (@.type() == "number")')
AND jsonb_path_exists({job}, '$.{task_type} ? (@.type() == "string")')"#,
apalis_schema = ApalisSchema::Schema.to_string(),
apalis_table = ApalisJobs::Table.to_string(),
job = ApalisJobs::Job.to_string(),
job_type = ApalisJobs::JobType.to_string(),
status = ApalisJobs::Status.to_string(),
subscriber_id = ApalisJobs::SubscriberId.to_string(),
task_type = ApalisJobs::TaskType.to_string(),
id = ApalisJobs::Id.to_string(),
attempts = ApalisJobs::Attempts.to_string(),
max_attempts = ApalisJobs::MaxAttempts.to_string(),
run_at = ApalisJobs::RunAt.to_string(),
last_error = ApalisJobs::LastError.to_string(),
lock_at = ApalisJobs::LockAt.to_string(),
lock_by = ApalisJobs::LockBy.to_string(),
done_at = ApalisJobs::DoneAt.to_string(),
priority = ApalisJobs::Priority.to_string(),
subscription_id = ApalisJobs::SubscriptionId.to_string(),
cron_id = ApalisJobs::CronId.to_string(),
))
.await?;
db.execute_unprepared(&format!(
r#"CREATE OR REPLACE VIEW system_tasks AS
SELECT
{job},
{job_type},
{status},
{subscriber_id},
{task_type},
{id},
{attempts},
{max_attempts},
{run_at},
{last_error},
{lock_at},
{lock_by},
{done_at},
{priority},
{cron_id}
FROM {apalis_schema}.{apalis_table}
WHERE {job_type} = '{SYSTEM_TASK_APALIS_NAME}'
AND jsonb_path_exists({job}, '$.{task_type} ? (@.type() == "string")')"#,
apalis_schema = ApalisSchema::Schema.to_string(),
apalis_table = ApalisJobs::Table.to_string(),
job = ApalisJobs::Job.to_string(),
job_type = ApalisJobs::JobType.to_string(),
status = ApalisJobs::Status.to_string(),
subscriber_id = ApalisJobs::SubscriberId.to_string(),
task_type = ApalisJobs::TaskType.to_string(),
id = ApalisJobs::Id.to_string(),
attempts = ApalisJobs::Attempts.to_string(),
max_attempts = ApalisJobs::MaxAttempts.to_string(),
run_at = ApalisJobs::RunAt.to_string(),
last_error = ApalisJobs::LastError.to_string(),
lock_at = ApalisJobs::LockAt.to_string(),
lock_by = ApalisJobs::LockBy.to_string(),
done_at = ApalisJobs::DoneAt.to_string(),
priority = ApalisJobs::Priority.to_string(),
cron_id = ApalisJobs::CronId.to_string(),
))
.await?;
db.execute_unprepared(&format!(
r#"
UPDATE {apalis_schema}.{apalis_table} SET {cron_id} = ({job} ->> '{cron_id}')::integer
"#,
apalis_schema = ApalisSchema::Schema.to_string(),
apalis_table = ApalisJobs::Table.to_string(),
job = ApalisJobs::Job.to_string(),
cron_id = ApalisJobs::CronId.to_string(),
))
.await?;
db.execute_unprepared(&format!(
r#"CREATE OR REPLACE FUNCTION {SETUP_APALIS_JOBS_EXTRA_FOREIGN_KEYS_FUNCTION_NAME}() RETURNS trigger AS $$
DECLARE
new_job_subscriber_id integer;
new_job_subscription_id integer;
new_job_cron_id integer;
new_job_task_type text;
BEGIN
new_job_subscriber_id = (NEW.{job} ->> '{subscriber_id}')::integer;
new_job_subscription_id = (NEW.{job} ->> '{subscription_id}')::integer;
new_job_cron_id = (NEW.{job} ->> '{cron_id}')::integer;
new_job_task_type = (NEW.{job} ->> '{task_type}')::text;
IF new_job_subscriber_id != (OLD.{job} ->> '{subscriber_id}')::integer AND new_job_subscriber_id != NEW.{subscriber_id} THEN
NEW.{subscriber_id} = new_job_subscriber_id;
END IF;
IF new_job_subscription_id != (OLD.{job} ->> '{subscription_id}')::integer AND new_job_subscription_id != NEW.{subscription_id} THEN
NEW.{subscription_id} = new_job_subscription_id;
END IF;
IF new_job_cron_id != (OLD.{job} ->> '{cron_id}')::integer AND new_job_cron_id != NEW.{cron_id} THEN
NEW.{cron_id} = new_job_cron_id;
END IF;
IF new_job_task_type != (OLD.{job} ->> '{task_type}')::text AND new_job_task_type != NEW.{task_type} THEN
NEW.{task_type} = new_job_task_type;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;"#,
job = ApalisJobs::Job.to_string(),
subscriber_id = ApalisJobs::SubscriberId.to_string(),
subscription_id = ApalisJobs::SubscriptionId.to_string(),
cron_id = ApalisJobs::CronId.to_string(),
task_type = ApalisJobs::TaskType.to_string(),
)).await?;
Ok(()) Ok(())
} }
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let db = manager.get_connection(); let db = manager.get_connection();
db.execute_unprepared(&format!(
r#"CREATE OR REPLACE FUNCTION {SETUP_APALIS_JOBS_EXTRA_FOREIGN_KEYS_FUNCTION_NAME}() RETURNS trigger AS $$
DECLARE
new_job_subscriber_id integer;
new_job_subscription_id integer;
new_job_task_type text;
BEGIN
new_job_subscriber_id = (NEW.{job} ->> '{subscriber_id}')::integer;
new_job_subscription_id = (NEW.{job} ->> '{subscription_id}')::integer;
new_job_task_type = (NEW.{job} ->> '{task_type}')::text;
IF new_job_subscriber_id != (OLD.{job} ->> '{subscriber_id}')::integer AND new_job_subscriber_id != NEW.{subscriber_id} THEN
NEW.{subscriber_id} = new_job_subscriber_id;
END IF;
IF new_job_subscription_id != (OLD.{job} ->> '{subscription_id}')::integer AND new_job_subscription_id != NEW.{subscription_id} THEN
NEW.{subscription_id} = new_job_subscription_id;
END IF;
IF new_job_task_type != (OLD.{job} ->> '{task_type}')::text AND new_job_task_type != NEW.{task_type} THEN
NEW.{task_type} = new_job_task_type;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;"#,
job = ApalisJobs::Job.to_string(),
subscriber_id = ApalisJobs::SubscriberId.to_string(),
subscription_id = ApalisJobs::SubscriptionId.to_string(),
task_type = ApalisJobs::TaskType.to_string(),
)).await?;
db.execute_unprepared(&format!(
r#"CREATE OR REPLACE VIEW subscriber_tasks AS
SELECT
{job},
{job_type},
{status},
{subscriber_id},
{task_type},
{id},
{attempts},
{max_attempts},
{run_at},
{last_error},
{lock_at},
{lock_by},
{done_at},
{priority},
{subscription_id}
FROM {apalis_schema}.{apalis_table}
WHERE {job_type} = '{SUBSCRIBER_TASK_APALIS_NAME}'
AND jsonb_path_exists({job}, '$.{subscriber_id} ? (@.type() == "number")')
AND jsonb_path_exists({job}, '$.{task_type} ? (@.type() == "string")')"#,
apalis_schema = ApalisSchema::Schema.to_string(),
apalis_table = ApalisJobs::Table.to_string(),
job = ApalisJobs::Job.to_string(),
job_type = ApalisJobs::JobType.to_string(),
status = ApalisJobs::Status.to_string(),
subscriber_id = ApalisJobs::SubscriberId.to_string(),
task_type = ApalisJobs::TaskType.to_string(),
id = ApalisJobs::Id.to_string(),
attempts = ApalisJobs::Attempts.to_string(),
max_attempts = ApalisJobs::MaxAttempts.to_string(),
run_at = ApalisJobs::RunAt.to_string(),
last_error = ApalisJobs::LastError.to_string(),
lock_at = ApalisJobs::LockAt.to_string(),
lock_by = ApalisJobs::LockBy.to_string(),
done_at = ApalisJobs::DoneAt.to_string(),
priority = ApalisJobs::Priority.to_string(),
subscription_id = ApalisJobs::SubscriptionId.to_string(),
))
.await?;
db.execute_unprepared(&format!(
r#"CREATE OR REPLACE VIEW system_tasks AS
SELECT
{job},
{job_type},
{status},
{subscriber_id},
{task_type},
{id},
{attempts},
{max_attempts},
{run_at},
{last_error},
{lock_at},
{lock_by},
{done_at},
{priority}
FROM {apalis_schema}.{apalis_table}
WHERE {job_type} = '{SYSTEM_TASK_APALIS_NAME}'
AND jsonb_path_exists({job}, '$.{task_type} ? (@.type() == "string")')"#,
apalis_schema = ApalisSchema::Schema.to_string(),
apalis_table = ApalisJobs::Table.to_string(),
job = ApalisJobs::Job.to_string(),
job_type = ApalisJobs::JobType.to_string(),
status = ApalisJobs::Status.to_string(),
subscriber_id = ApalisJobs::SubscriberId.to_string(),
task_type = ApalisJobs::TaskType.to_string(),
id = ApalisJobs::Id.to_string(),
attempts = ApalisJobs::Attempts.to_string(),
max_attempts = ApalisJobs::MaxAttempts.to_string(),
run_at = ApalisJobs::RunAt.to_string(),
last_error = ApalisJobs::LastError.to_string(),
lock_at = ApalisJobs::LockAt.to_string(),
lock_by = ApalisJobs::LockBy.to_string(),
done_at = ApalisJobs::DoneAt.to_string(),
priority = ApalisJobs::Priority.to_string(),
))
.await?;
manager
.alter_table(
TableAlterStatement::new()
.table((ApalisSchema::Schema, ApalisJobs::Table))
.drop_column(ApalisJobs::CronId)
.drop_foreign_key("fk_apalis_jobs_cron_id")
.to_owned(),
)
.await?;
db.execute_unprepared(&format!( db.execute_unprepared(&format!(
r#"DROP TRIGGER IF EXISTS {NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME} ON {table};"#, r#"DROP TRIGGER IF EXISTS {NOTIFY_DUE_CRON_WHEN_MUTATING_TRIGGER_NAME} ON {table};"#,
table = &Cron::Table.to_string(), table = &Cron::Table.to_string(),

View File

@ -7,10 +7,9 @@ pub mod m20220101_000001_init;
pub mod m20240224_082543_add_downloads; pub mod m20240224_082543_add_downloads;
pub mod m20241231_000001_auth; pub mod m20241231_000001_auth;
pub mod m20250501_021523_credential_3rd; pub mod m20250501_021523_credential_3rd;
pub mod m20250520_021135_subscriber_tasks; pub mod m20250520_021135_add_tasks;
pub mod m20250622_015618_feeds; pub mod m20250622_015618_feeds;
pub mod m20250622_020819_bangumi_and_episode_type; pub mod m20250622_020819_bangumi_and_episode_type;
pub mod m20250625_060701_add_subscription_id_to_subscriber_tasks;
pub mod m20250629_065628_add_cron; pub mod m20250629_065628_add_cron;
pub struct Migrator; pub struct Migrator;
@ -23,10 +22,9 @@ impl MigratorTrait for Migrator {
Box::new(m20240224_082543_add_downloads::Migration), Box::new(m20240224_082543_add_downloads::Migration),
Box::new(m20241231_000001_auth::Migration), Box::new(m20241231_000001_auth::Migration),
Box::new(m20250501_021523_credential_3rd::Migration), Box::new(m20250501_021523_credential_3rd::Migration),
Box::new(m20250520_021135_subscriber_tasks::Migration), Box::new(m20250520_021135_add_tasks::Migration),
Box::new(m20250622_015618_feeds::Migration), Box::new(m20250622_015618_feeds::Migration),
Box::new(m20250622_020819_bangumi_and_episode_type::Migration), Box::new(m20250622_020819_bangumi_and_episode_type::Migration),
Box::new(m20250625_060701_add_subscription_id_to_subscriber_tasks::Migration),
Box::new(m20250629_065628_add_cron::Migration), Box::new(m20250629_065628_add_cron::Migration),
] ]
} }

View File

@ -21,8 +21,10 @@ use sea_orm::{
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::{ use crate::{
app::AppContextTrait, errors::RecorderResult, models::subscriber_tasks, app::AppContextTrait,
task::SubscriberTaskTrait, errors::RecorderResult,
models::{subscriber_tasks, system_tasks},
task::{SubscriberTaskTrait, SystemTaskTrait},
}; };
#[derive( #[derive(
@ -41,7 +43,7 @@ pub enum CronStatus {
Failed, Failed,
} }
#[derive(Debug, Clone, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)] #[derive(Debug, Clone, DeriveEntityModel, PartialEq, Serialize, Deserialize)]
#[sea_orm(table_name = "cron")] #[sea_orm(table_name = "cron")]
pub struct Model { pub struct Model {
#[sea_orm(default_expr = "Expr::current_timestamp()")] #[sea_orm(default_expr = "Expr::current_timestamp()")]
@ -70,6 +72,7 @@ pub struct Model {
#[sea_orm(default_expr = "true")] #[sea_orm(default_expr = "true")]
pub enabled: bool, pub enabled: bool,
pub subscriber_task: Option<subscriber_tasks::SubscriberTask>, pub subscriber_task: Option<subscriber_tasks::SubscriberTask>,
pub system_task: Option<system_tasks::SystemTask>,
} }
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
@ -79,7 +82,7 @@ pub enum Relation {
from = "Column::SubscriberId", from = "Column::SubscriberId",
to = "super::subscribers::Column::Id", to = "super::subscribers::Column::Id",
on_update = "Cascade", on_update = "Cascade",
on_delete = "Cascade" on_delete = "Restrict"
)] )]
Subscriber, Subscriber,
#[sea_orm( #[sea_orm(
@ -87,9 +90,13 @@ pub enum Relation {
from = "Column::SubscriptionId", from = "Column::SubscriptionId",
to = "super::subscriptions::Column::Id", to = "super::subscriptions::Column::Id",
on_update = "Cascade", on_update = "Cascade",
on_delete = "Cascade" on_delete = "Restrict"
)] )]
Subscription, Subscription,
#[sea_orm(has_many = "super::subscriber_tasks::Entity")]
SubscriberTask,
#[sea_orm(has_many = "super::system_tasks::Entity")]
SystemTask,
} }
impl Related<super::subscribers::Entity> for Entity { impl Related<super::subscribers::Entity> for Entity {
@ -104,12 +111,28 @@ impl Related<super::subscriptions::Entity> for Entity {
} }
} }
impl Related<super::subscriber_tasks::Entity> for Entity {
fn to() -> RelationDef {
Relation::SubscriberTask.def()
}
}
impl Related<super::system_tasks::Entity> for Entity {
fn to() -> RelationDef {
Relation::SystemTask.def()
}
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)] #[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)]
pub enum RelatedEntity { pub enum RelatedEntity {
#[sea_orm(entity = "super::subscribers::Entity")] #[sea_orm(entity = "super::subscribers::Entity")]
Subscriber, Subscriber,
#[sea_orm(entity = "super::subscriptions::Entity")] #[sea_orm(entity = "super::subscriptions::Entity")]
Subscription, Subscription,
#[sea_orm(entity = "super::subscriber_tasks::Entity")]
SubscriberTask,
#[sea_orm(entity = "super::system_tasks::Entity")]
SystemTask,
} }
#[async_trait] #[async_trait]
@ -136,6 +159,14 @@ impl ActiveModelBehavior for ActiveModel {
"Cron subscriber_id does not match subscriber_task.subscriber_id".to_string(), "Cron subscriber_id does not match subscriber_task.subscriber_id".to_string(),
)); ));
} }
if let ActiveValue::Set(Some(subscriber_id)) = self.subscriber_id
&& let ActiveValue::Set(Some(ref system_task)) = self.system_task
&& system_task.get_subscriber_id() != Some(subscriber_id)
{
return Err(DbErr::Custom(
"Cron subscriber_id does not match system_task.subscriber_id".to_string(),
));
}
Ok(self) Ok(self)
} }
@ -219,11 +250,18 @@ impl Model {
async fn exec_cron(&self, ctx: &dyn AppContextTrait) -> RecorderResult<()> { async fn exec_cron(&self, ctx: &dyn AppContextTrait) -> RecorderResult<()> {
if let Some(subscriber_task) = self.subscriber_task.as_ref() { if let Some(subscriber_task) = self.subscriber_task.as_ref() {
let task_service = ctx.task(); let task_service = ctx.task();
let mut new_subscriber_task = subscriber_task.clone();
new_subscriber_task.set_cron_id(Some(self.id));
task_service task_service
.add_subscriber_task(subscriber_task.clone()) .add_subscriber_task(new_subscriber_task)
.await?; .await?;
} else if let Some(system_task) = self.system_task.as_ref() {
let task_service = ctx.task();
let mut new_system_task = system_task.clone();
new_system_task.set_cron_id(Some(self.id));
task_service.add_system_task(new_system_task).await?;
} else { } else {
unimplemented!("Cron without subscriber task is not supported now"); unimplemented!("Cron without unknown task is not supported now");
} }
Ok(()) Ok(())

View File

@ -1,6 +1,7 @@
pub mod auth; pub mod auth;
pub mod bangumi; pub mod bangumi;
pub mod credential_3rd; pub mod credential_3rd;
pub mod cron;
pub mod downloaders; pub mod downloaders;
pub mod downloads; pub mod downloads;
pub mod episodes; pub mod episodes;
@ -11,4 +12,4 @@ pub mod subscribers;
pub mod subscription_bangumi; pub mod subscription_bangumi;
pub mod subscription_episode; pub mod subscription_episode;
pub mod subscriptions; pub mod subscriptions;
pub mod cron; pub mod system_tasks;

View File

@ -24,13 +24,14 @@ pub enum SubscriberTaskStatus {
Killed, Killed,
} }
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] #[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "subscriber_tasks")] #[sea_orm(table_name = "subscriber_tasks")]
pub struct Model { pub struct Model {
#[sea_orm(primary_key)] #[sea_orm(primary_key)]
pub id: String, pub id: String,
pub subscriber_id: i32, pub subscriber_id: i32,
pub subscription_id: Option<i32>, pub subscription_id: Option<i32>,
pub cron_id: Option<i32>,
pub job: SubscriberTask, pub job: SubscriberTask,
pub task_type: SubscriberTaskType, pub task_type: SubscriberTaskType,
pub status: SubscriberTaskStatus, pub status: SubscriberTaskStatus,
@ -51,7 +52,7 @@ pub enum Relation {
from = "Column::SubscriberId", from = "Column::SubscriberId",
to = "super::subscribers::Column::Id", to = "super::subscribers::Column::Id",
on_update = "Cascade", on_update = "Cascade",
on_delete = "Cascade" on_delete = "NoAction"
)] )]
Subscriber, Subscriber,
#[sea_orm( #[sea_orm(
@ -62,6 +63,14 @@ pub enum Relation {
on_delete = "NoAction" on_delete = "NoAction"
)] )]
Subscription, Subscription,
#[sea_orm(
belongs_to = "super::cron::Entity",
from = "Column::CronId",
to = "super::cron::Column::Id",
on_update = "NoAction",
on_delete = "NoAction"
)]
Cron,
} }
impl Related<super::subscribers::Entity> for Entity { impl Related<super::subscribers::Entity> for Entity {
@ -76,12 +85,20 @@ impl Related<super::subscriptions::Entity> for Entity {
} }
} }
impl Related<super::cron::Entity> for Entity {
fn to() -> RelationDef {
Relation::Cron.def()
}
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)] #[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)]
pub enum RelatedEntity { pub enum RelatedEntity {
#[sea_orm(entity = "super::subscribers::Entity")] #[sea_orm(entity = "super::subscribers::Entity")]
Subscriber, Subscriber,
#[sea_orm(entity = "super::subscriptions::Entity")] #[sea_orm(entity = "super::subscriptions::Entity")]
Subscription, Subscription,
#[sea_orm(entity = "super::cron::Entity")]
Cron,
} }
#[async_trait] #[async_trait]

View File

@ -45,6 +45,8 @@ pub enum Relation {
Feed, Feed,
#[sea_orm(has_many = "super::subscriber_tasks::Entity")] #[sea_orm(has_many = "super::subscriber_tasks::Entity")]
SubscriberTask, SubscriberTask,
#[sea_orm(has_many = "super::system_tasks::Entity")]
SystemTask,
} }
impl Related<super::subscriptions::Entity> for Entity { impl Related<super::subscriptions::Entity> for Entity {
@ -95,6 +97,12 @@ impl Related<super::subscriber_tasks::Entity> for Entity {
} }
} }
impl Related<super::system_tasks::Entity> for Entity {
fn to() -> RelationDef {
Relation::SystemTask.def()
}
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)] #[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)]
pub enum RelatedEntity { pub enum RelatedEntity {
#[sea_orm(entity = "super::subscriptions::Entity")] #[sea_orm(entity = "super::subscriptions::Entity")]
@ -111,6 +119,8 @@ pub enum RelatedEntity {
Feed, Feed,
#[sea_orm(entity = "super::subscriber_tasks::Entity")] #[sea_orm(entity = "super::subscriber_tasks::Entity")]
SubscriberTask, SubscriberTask,
#[sea_orm(entity = "super::system_tasks::Entity")]
SystemTask,
} }
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]

View File

@ -0,0 +1,99 @@
use async_trait::async_trait;
use sea_orm::{ActiveValue, entity::prelude::*};
pub use crate::task::{
SystemTask, SystemTaskInput, SystemTaskType, SystemTaskTypeEnum, SystemTaskTypeVariant,
SystemTaskTypeVariantIter,
};
#[derive(Clone, Debug, PartialEq, Eq, DeriveActiveEnum, EnumIter, DeriveDisplay)]
#[sea_orm(rs_type = "String", db_type = "Text")]
pub enum SystemTaskStatus {
#[sea_orm(string_value = "Pending")]
Pending,
#[sea_orm(string_value = "Scheduled")]
Scheduled,
#[sea_orm(string_value = "Running")]
Running,
#[sea_orm(string_value = "Done")]
Done,
#[sea_orm(string_value = "Failed")]
Failed,
#[sea_orm(string_value = "Killed")]
Killed,
}
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "system_tasks")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: String,
pub subscriber_id: Option<i32>,
pub cron_id: Option<i32>,
pub job: SystemTask,
pub task_type: SystemTaskType,
pub status: SystemTaskStatus,
pub attempts: i32,
pub max_attempts: i32,
pub run_at: DateTimeUtc,
pub last_error: Option<String>,
pub lock_at: Option<DateTimeUtc>,
pub lock_by: Option<String>,
pub done_at: Option<DateTimeUtc>,
pub priority: i32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::subscribers::Entity",
from = "Column::SubscriberId",
to = "super::subscribers::Column::Id",
on_update = "Cascade",
on_delete = "Restrict"
)]
Subscriber,
#[sea_orm(
belongs_to = "super::cron::Entity",
from = "Column::CronId",
to = "super::cron::Column::Id",
on_update = "NoAction",
on_delete = "NoAction"
)]
Cron,
}
impl Related<super::subscribers::Entity> for Entity {
fn to() -> RelationDef {
Relation::Subscriber.def()
}
}
impl Related<super::cron::Entity> for Entity {
fn to() -> RelationDef {
Relation::Cron.def()
}
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelatedEntity)]
pub enum RelatedEntity {
#[sea_orm(entity = "super::subscribers::Entity")]
Subscriber,
#[sea_orm(entity = "super::cron::Entity")]
Cron,
}
#[async_trait]
impl ActiveModelBehavior for ActiveModel {
async fn before_save<C>(mut self, _db: &C, _insert: bool) -> Result<Self, DbErr>
where
C: ConnectionTrait,
{
if let ActiveValue::Set(Some(..)) = self.subscriber_id {
return Err(DbErr::Custom(
"SystemTask can not be created by subscribers now".to_string(),
));
}
Ok(self)
}
}

View File

@ -89,6 +89,13 @@ impl StorageService {
p p
} }
#[cfg(any(test, feature = "test-utils"))]
pub fn build_test_path(&self, path: impl AsRef<Path>) -> PathBuf {
let mut p = PathBuf::from("/test");
p.push(path);
p
}
pub fn build_public_path(&self, path: impl AsRef<Path>) -> PathBuf { pub fn build_public_path(&self, path: impl AsRef<Path>) -> PathBuf {
let mut p = PathBuf::from("/public"); let mut p = PathBuf::from("/public");
p.push(path); p.push(path);

View File

@ -2,12 +2,16 @@ use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use futures::{Stream, StreamExt, pin_mut}; use futures::{Stream, StreamExt, pin_mut};
use serde::{Deserialize, Serialize, de::DeserializeOwned}; use serde::{Serialize, de::DeserializeOwned};
use crate::{app::AppContextTrait, errors::RecorderResult}; use crate::{app::AppContextTrait, errors::RecorderResult};
pub const SYSTEM_TASK_APALIS_NAME: &str = "system_task"; pub const SYSTEM_TASK_APALIS_NAME: &str = "system_task";
pub const SUBSCRIBER_TASK_APALIS_NAME: &str = "subscriber_task"; pub const SUBSCRIBER_TASK_APALIS_NAME: &str = "subscriber_task";
pub const SETUP_APALIS_JOBS_EXTRA_FOREIGN_KEYS_FUNCTION_NAME: &str =
"setup_apalis_jobs_extra_foreign_keys";
pub const SETUP_APALIS_JOBS_EXTRA_FOREIGN_KEYS_TRIGGER_NAME: &str =
"setup_apalis_jobs_extra_foreign_keys_trigger";
#[async_trait] #[async_trait]
pub trait AsyncTaskTrait: Serialize + DeserializeOwned + Sized { pub trait AsyncTaskTrait: Serialize + DeserializeOwned + Sized {
@ -41,20 +45,30 @@ where
} }
} }
pub trait SystemTaskTrait: AsyncTaskTrait {
type InputType: Serialize + DeserializeOwned + Sized + Send;
fn get_subscriber_id(&self) -> Option<i32>;
fn set_subscriber_id(&mut self, subscriber_id: Option<i32>);
fn get_cron_id(&self) -> Option<i32>;
fn set_cron_id(&mut self, cron_id: Option<i32>);
fn from_input(input: Self::InputType, subscriber_id: Option<i32>) -> Self;
}
pub trait SubscriberTaskTrait: AsyncTaskTrait { pub trait SubscriberTaskTrait: AsyncTaskTrait {
type InputType: Serialize + DeserializeOwned + Sized + Send; type InputType: Serialize + DeserializeOwned + Sized + Send;
fn get_subscriber_id(&self) -> i32; fn get_subscriber_id(&self) -> i32;
fn set_subscriber_id(&mut self, subscriber_id: i32);
fn get_cron_id(&self) -> Option<i32>; fn get_cron_id(&self) -> Option<i32>;
fn set_cron_id(&mut self, cron_id: Option<i32>);
fn from_input(input: Self::InputType, subscriber_id: i32) -> Self; fn from_input(input: Self::InputType, subscriber_id: i32) -> Self;
} }
pub trait SystemTaskTrait: AsyncTaskTrait {}
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)]
pub struct SubscriberTaskBase {
pub subscriber_id: i32,
pub cron_id: Option<i32>,
}

View File

@ -1,16 +0,0 @@
use sea_orm::sea_query;
#[derive(sea_query::Iden)]
pub enum ApalisSchema {
#[iden = "apalis"]
Schema,
}
#[derive(sea_query::Iden)]
pub enum ApalisJobs {
#[iden = "jobs"]
Table,
Id,
}

View File

@ -1,21 +1,22 @@
mod config; mod config;
mod core; mod core;
mod r#extern;
mod registry; mod registry;
mod service; mod service;
pub use core::{ pub use core::{
AsyncTaskTrait, SUBSCRIBER_TASK_APALIS_NAME, SYSTEM_TASK_APALIS_NAME, StreamTaskTrait, AsyncTaskTrait, SETUP_APALIS_JOBS_EXTRA_FOREIGN_KEYS_FUNCTION_NAME,
SubscriberTaskBase, SubscriberTaskTrait, SystemTaskTrait, SETUP_APALIS_JOBS_EXTRA_FOREIGN_KEYS_TRIGGER_NAME, SUBSCRIBER_TASK_APALIS_NAME,
SYSTEM_TASK_APALIS_NAME, StreamTaskTrait, SubscriberTaskTrait, SystemTaskTrait,
}; };
pub use config::TaskConfig; pub use config::TaskConfig;
pub use r#extern::{ApalisJobs, ApalisSchema};
pub use registry::{ pub use registry::{
OptimizeImageTask, SubscriberTask, SubscriberTaskInput, SubscriberTaskType, OptimizeImageTask, SubscriberTask, SubscriberTaskInput, SubscriberTaskType,
SubscriberTaskTypeEnum, SubscriberTaskTypeVariant, SubscriberTaskTypeVariantIter, SubscriberTaskTypeEnum, SubscriberTaskTypeVariant, SubscriberTaskTypeVariantIter,
SyncOneSubscriptionFeedsFullTask, SyncOneSubscriptionFeedsIncrementalTask, SyncOneSubscriptionFeedsFullTask, SyncOneSubscriptionFeedsIncrementalTask,
SyncOneSubscriptionSourcesTask, SystemTask, SystemTaskType, SystemTaskTypeEnum, SyncOneSubscriptionSourcesTask, SystemTask, SystemTaskInput, SystemTaskType,
SystemTaskTypeVariant, SystemTaskTypeVariantIter, SystemTaskTypeEnum, SystemTaskTypeVariant, SystemTaskTypeVariantIter,
}; };
#[allow(unused_imports)]
pub(crate) use registry::{register_subscriber_task_type, register_system_task_type};
pub use service::TaskService; pub use service::TaskService;

View File

@ -1,12 +1,14 @@
mod subscriber; mod subscriber;
mod system; mod system;
pub(crate) use subscriber::register_subscriber_task_type;
pub use subscriber::{ pub use subscriber::{
SubscriberTask, SubscriberTaskInput, SubscriberTaskType, SubscriberTaskTypeEnum, SubscriberTask, SubscriberTaskInput, SubscriberTaskType, SubscriberTaskTypeEnum,
SubscriberTaskTypeVariant, SubscriberTaskTypeVariantIter, SyncOneSubscriptionFeedsFullTask, SubscriberTaskTypeVariant, SubscriberTaskTypeVariantIter, SyncOneSubscriptionFeedsFullTask,
SyncOneSubscriptionFeedsIncrementalTask, SyncOneSubscriptionSourcesTask, SyncOneSubscriptionFeedsIncrementalTask, SyncOneSubscriptionSourcesTask,
}; };
pub(crate) use system::register_system_task_type;
pub use system::{ pub use system::{
OptimizeImageTask, SystemTask, SystemTaskType, SystemTaskTypeEnum, SystemTaskTypeVariant, OptimizeImageTask, SystemTask, SystemTaskInput, SystemTaskType, SystemTaskTypeEnum,
SystemTaskTypeVariantIter, SystemTaskTypeVariant, SystemTaskTypeVariantIter,
}; };

View File

@ -7,7 +7,7 @@ macro_rules! register_subscriber_task_type {
) => { ) => {
$(#[$type_meta])* $(#[$type_meta])*
#[derive(typed_builder::TypedBuilder, ts_rs::TS, serde::Serialize, serde::Deserialize)] #[derive(typed_builder::TypedBuilder, ts_rs::TS, serde::Serialize, serde::Deserialize)]
#[ts(export, rename_all = "camelCase")] #[ts(rename_all = "camelCase")]
$task_vis struct $task_name { $task_vis struct $task_name {
$($(#[$field_meta])* pub $field_name: $field_type,)* $($(#[$field_meta])* pub $field_name: $field_type,)*
pub subscriber_id: i32, pub subscriber_id: i32,
@ -20,7 +20,7 @@ macro_rules! register_subscriber_task_type {
$(#[$type_meta])* $(#[$type_meta])*
#[derive(ts_rs::TS, serde::Serialize, serde::Deserialize)] #[derive(ts_rs::TS, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
#[ts(export, rename_all = "camelCase")] #[ts(rename_all = "camelCase")]
$task_vis struct [<$task_name Input>] { $task_vis struct [<$task_name Input>] {
$($(#[$field_meta])* pub $field_name: $field_type,)* $($(#[$field_meta])* pub $field_name: $field_type,)*
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
@ -44,6 +44,14 @@ macro_rules! register_subscriber_task_type {
self.cron_id self.cron_id
} }
fn set_subscriber_id(&mut self, subscriber_id: i32) {
self.subscriber_id = subscriber_id;
}
fn set_cron_id(&mut self, cron_id: Option<i32>) {
self.cron_id = cron_id;
}
fn from_input(input: Self::InputType, subscriber_id: i32) -> Self { fn from_input(input: Self::InputType, subscriber_id: i32) -> Self {
Self { Self {
$($field_name: input.$field_name,)* $($field_name: input.$field_name,)*

View File

@ -1,6 +1,7 @@
mod base; mod base;
mod subscription; mod subscription;
pub(crate) use base::register_subscriber_task_type;
use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter, FromJsonQueryResult}; use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter, FromJsonQueryResult};
pub use subscription::{ pub use subscription::{
SyncOneSubscriptionFeedsFullTask, SyncOneSubscriptionFeedsIncrementalTask, SyncOneSubscriptionFeedsFullTask, SyncOneSubscriptionFeedsIncrementalTask,
@ -67,23 +68,6 @@ macro_rules! register_subscriber_task_types {
} }
} }
impl TryFrom<$task_enum_name> for serde_json::Value {
type Error = $crate::errors::RecorderError;
fn try_from(value: $task_enum_name) -> Result<Self, Self::Error> {
let json_value = serde_json::to_value(value)?;
Ok(match json_value {
serde_json::Value::Object(mut map) => {
map.remove("task_type");
serde_json::Value::Object(map)
}
_ => {
unreachable!("subscriber task must be an json object");
}
})
}
}
impl $task_enum_name { impl $task_enum_name {
pub fn task_type(&self) -> $type_enum_name { pub fn task_type(&self) -> $type_enum_name {
match self { match self {
@ -121,6 +105,18 @@ macro_rules! register_subscriber_task_types {
} }
} }
fn set_subscriber_id(&mut self, subscriber_id: i32) {
match self {
$(Self::$task_variant(t) => t.set_subscriber_id(subscriber_id),)*
}
}
fn set_cron_id(&mut self, cron_id: Option<i32>) {
match self {
$(Self::$task_variant(t) => t.set_cron_id(cron_id),)*
}
}
fn from_input(input: Self::InputType, subscriber_id: i32) -> Self { fn from_input(input: Self::InputType, subscriber_id: i32) -> Self {
match input { match input {
$(Self::InputType::$task_variant(t) => $(Self::InputType::$task_variant(t) =>
@ -159,7 +155,7 @@ register_subscriber_task_types!(
} }
}, },
task_enum: { task_enum: {
#[derive(Clone, Debug, PartialEq, Eq, FromJsonQueryResult)] #[derive(Clone, Debug, PartialEq, FromJsonQueryResult)]
pub enum SubscriberTask { pub enum SubscriberTask {
SyncOneSubscriptionFeedsIncremental(SyncOneSubscriptionFeedsIncrementalTask), SyncOneSubscriptionFeedsIncremental(SyncOneSubscriptionFeedsIncrementalTask),
SyncOneSubscriptionFeedsFull(SyncOneSubscriptionFeedsFullTask), SyncOneSubscriptionFeedsFull(SyncOneSubscriptionFeedsFullTask),

View File

@ -39,7 +39,7 @@ macro_rules! register_subscription_task_type {
} }
register_subscription_task_type! { register_subscription_task_type! {
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq)]
pub struct SyncOneSubscriptionFeedsIncrementalTask { pub struct SyncOneSubscriptionFeedsIncrementalTask {
} => async |subscription, ctx| -> RecorderResult<()> { } => async |subscription, ctx| -> RecorderResult<()> {
subscription.sync_feeds_incremental(ctx).await?; subscription.sync_feeds_incremental(ctx).await?;
@ -48,7 +48,7 @@ register_subscription_task_type! {
} }
register_subscription_task_type! { register_subscription_task_type! {
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq)]
pub struct SyncOneSubscriptionFeedsFullTask { pub struct SyncOneSubscriptionFeedsFullTask {
} => async |subscription, ctx| -> RecorderResult<()> { } => async |subscription, ctx| -> RecorderResult<()> {
subscription.sync_feeds_full(ctx).await?; subscription.sync_feeds_full(ctx).await?;
@ -57,7 +57,7 @@ register_subscription_task_type! {
} }
register_subscription_task_type! { register_subscription_task_type! {
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq)]
pub struct SyncOneSubscriptionSourcesTask { pub struct SyncOneSubscriptionSourcesTask {
} => async |subscription, ctx| -> RecorderResult<()> { } => async |subscription, ctx| -> RecorderResult<()> {
subscription.sync_sources(ctx).await?; subscription.sync_sources(ctx).await?;

View File

@ -0,0 +1,67 @@
macro_rules! register_system_task_type {
(
$(#[$type_meta:meta])*
$task_vis:vis struct $task_name:ident {
$($(#[$field_meta:meta])* pub $field_name:ident: $field_type:ty),* $(,)?
}
) => {
$(#[$type_meta])*
#[derive(typed_builder::TypedBuilder, ts_rs::TS, serde::Serialize, serde::Deserialize)]
#[ts(rename_all = "camelCase")]
$task_vis struct $task_name {
$($(#[$field_meta])* pub $field_name: $field_type,)*
#[serde(default, skip_serializing_if = "Option::is_none")]
#[builder(default = None)]
pub subscriber_id: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[builder(default = None)]
pub cron_id: Option<i32>,
}
paste::paste! {
$(#[$type_meta])*
#[derive(ts_rs::TS, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
$task_vis struct [<$task_name Input>] {
$($(#[$field_meta])* pub $field_name: $field_type,)*
#[serde(default, skip_serializing_if = "Option::is_none")]
pub subscriber_id: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cron_id: Option<i32>,
}
}
impl $crate::task::SystemTaskTrait for $task_name {
paste::paste! {
type InputType = [<$task_name Input>];
}
fn get_subscriber_id(&self) -> Option<i32> {
self.subscriber_id
}
fn get_cron_id(&self) -> Option<i32> {
self.cron_id
}
fn set_subscriber_id(&mut self, subscriber_id: Option<i32>) {
self.subscriber_id = subscriber_id;
}
fn set_cron_id(&mut self, cron_id: Option<i32>) {
self.cron_id = cron_id;
}
fn from_input(input: Self::InputType, subscriber_id: Option<i32>) -> Self {
Self {
$($field_name: input.$field_name,)*
subscriber_id: input.subscriber_id.or(subscriber_id),
cron_id: input.cron_id,
}
}
}
}
}
pub(crate) use register_system_task_type;

View File

@ -1,19 +1,23 @@
use std::sync::Arc; use std::sync::Arc;
use quirks_path::Path; use quirks_path::Path;
use serde::{Deserialize, Serialize};
use tracing::instrument; use tracing::instrument;
use crate::{ use crate::{
app::AppContextTrait, errors::RecorderResult, media::EncodeImageOptions, task::AsyncTaskTrait, app::AppContextTrait,
errors::RecorderResult,
media::EncodeImageOptions,
task::{AsyncTaskTrait, register_system_task_type},
}; };
#[derive(Clone, Debug, Serialize, Deserialize)] register_system_task_type! {
#[derive(Clone, Debug, PartialEq)]
pub struct OptimizeImageTask { pub struct OptimizeImageTask {
pub source_path: String, pub source_path: String,
pub target_path: String, pub target_path: String,
pub format_options: EncodeImageOptions, pub format_options: EncodeImageOptions,
} }
}
#[async_trait::async_trait] #[async_trait::async_trait]
impl AsyncTaskTrait for OptimizeImageTask { impl AsyncTaskTrait for OptimizeImageTask {

View File

@ -1,14 +1,15 @@
mod base;
mod media; mod media;
pub(crate) use base::register_system_task_type;
pub use media::OptimizeImageTask; pub use media::OptimizeImageTask;
use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter, FromJsonQueryResult}; use sea_orm::{DeriveActiveEnum, DeriveDisplay, EnumIter, FromJsonQueryResult};
use serde::{Deserialize, Serialize};
macro_rules! register_system_task_types { macro_rules! register_system_task_types {
( (
task_type_enum: { task_type_enum: {
$(#[$type_enum_meta:meta])* $(#[$type_enum_meta:meta])*
pub enum $type_enum_name:ident { $type_vis:vis enum $type_enum_name:ident {
$( $(
$(#[$variant_meta:meta])* $(#[$variant_meta:meta])*
$variant:ident => $string_value:literal $variant:ident => $string_value:literal
@ -17,16 +18,18 @@ macro_rules! register_system_task_types {
}, },
task_enum: { task_enum: {
$(#[$task_enum_meta:meta])* $(#[$task_enum_meta:meta])*
pub enum $task_enum_name:ident { $task_vis:vis enum $task_enum_name:ident {
$( $(
$(#[$task_variant_meta:meta])*
$task_variant:ident($task_type:ty) $task_variant:ident($task_type:ty)
),* $(,)? ),* $(,)?
} }
} }
) => { ) => {
$(#[$type_enum_meta])* $(#[$type_enum_meta])*
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)]
#[sea_orm(rs_type = "String", db_type = "Text")] #[sea_orm(rs_type = "String", db_type = "Text")]
pub enum $type_enum_name { $type_vis enum $type_enum_name {
$( $(
$(#[$variant_meta])* $(#[$variant_meta])*
#[serde(rename = $string_value)] #[serde(rename = $string_value)]
@ -37,30 +40,17 @@ macro_rules! register_system_task_types {
$(#[$task_enum_meta])* $(#[$task_enum_meta])*
#[derive(ts_rs::TS, serde::Serialize, serde::Deserialize, PartialEq)]
#[serde(tag = "task_type")] #[serde(tag = "task_type")]
pub enum $task_enum_name { #[ts(export, rename = "SystemTaskType", rename_all = "camelCase", tag = "taskType")]
$task_vis enum $task_enum_name {
$( $(
$(#[$task_variant_meta])*
#[serde(rename = $string_value)]
$task_variant($task_type), $task_variant($task_type),
)* )*
} }
impl TryFrom<$task_enum_name> for serde_json::Value {
type Error = $crate::errors::RecorderError;
fn try_from(value: $task_enum_name) -> Result<Self, Self::Error> {
let json_value = serde_json::to_value(value)?;
Ok(match json_value {
serde_json::Value::Object(mut map) => {
map.remove("task_type");
serde_json::Value::Object(map)
}
_ => {
unreachable!("subscriber task must be an json object");
}
})
}
}
impl $task_enum_name { impl $task_enum_name {
pub fn task_type(&self) -> $type_enum_name { pub fn task_type(&self) -> $type_enum_name {
match self { match self {
@ -69,6 +59,21 @@ macro_rules! register_system_task_types {
} }
} }
paste::paste! {
$(#[$task_enum_meta])*
#[derive(ts_rs::TS, serde::Serialize, serde::Deserialize, PartialEq)]
#[serde(tag = "taskType", rename_all = "camelCase")]
#[ts(export, rename_all = "camelCase", tag = "taskType")]
$task_vis enum [<$task_enum_name Input>] {
$(
$(#[$task_variant_meta])*
#[serde(rename = $string_value)]
$task_variant(<$task_type as $crate::task::SystemTaskTrait>::InputType),
)*
}
}
#[async_trait::async_trait] #[async_trait::async_trait]
impl $crate::task::AsyncTaskTrait for $task_enum_name { impl $crate::task::AsyncTaskTrait for $task_enum_name {
async fn run_async(self, ctx: std::sync::Arc<dyn $crate::app::AppContextTrait>) -> $crate::errors::RecorderResult<()> { async fn run_async(self, ctx: std::sync::Arc<dyn $crate::app::AppContextTrait>) -> $crate::errors::RecorderResult<()> {
@ -78,18 +83,60 @@ macro_rules! register_system_task_types {
} }
} }
} }
impl $crate::task::SystemTaskTrait for $task_enum_name {
paste::paste! {
type InputType = [<$task_enum_name Input>];
}
fn get_subscriber_id(&self) -> Option<i32> {
match self {
$(Self::$task_variant(t) => t.get_subscriber_id(),)*
}
}
fn get_cron_id(&self) -> Option<i32> {
match self {
$(Self::$task_variant(t) => t.get_cron_id(),)*
}
}
fn set_subscriber_id(&mut self, subscriber_id: Option<i32>) {
match self {
$(Self::$task_variant(t) => t.set_subscriber_id(subscriber_id),)*
}
}
fn set_cron_id(&mut self, cron_id: Option<i32>) {
match self {
$(Self::$task_variant(t) => t.set_cron_id(cron_id),)*
}
}
fn from_input(input: Self::InputType, subscriber_id: Option<i32>) -> Self {
match input {
$(Self::InputType::$task_variant(t) =>
Self::$task_variant(<$task_type as $crate::task::SystemTaskTrait>::from_input(t, subscriber_id)),)*
}
}
}
$(
impl From<$task_type> for $task_enum_name {
fn from(task: $task_type) -> Self {
Self::$task_variant(task)
}
}
)*
}; };
} }
#[cfg(not(any(test, feature = "test-utils")))]
register_system_task_types! { register_system_task_types! {
task_type_enum: { task_type_enum: {
#[derive( #[derive(
Clone, Clone,
Debug, Debug,
Serialize,
Deserialize,
PartialEq,
Eq,
Copy, Copy,
DeriveActiveEnum, DeriveActiveEnum,
DeriveDisplay, DeriveDisplay,
@ -100,9 +147,34 @@ register_system_task_types! {
} }
}, },
task_enum: { task_enum: {
#[derive(Clone, Debug, Serialize, Deserialize, FromJsonQueryResult)] #[derive(Clone, Debug, FromJsonQueryResult)]
pub enum SystemTask {
OptimizeImage(OptimizeImageTask)
}
}
}
#[cfg(any(test, feature = "test-utils"))]
register_system_task_types! {
task_type_enum: {
#[derive(
Clone,
Debug,
Copy,
DeriveActiveEnum,
DeriveDisplay,
EnumIter
)]
pub enum SystemTaskType {
OptimizeImage => "optimize_image",
Test => "test",
}
},
task_enum: {
#[derive(Clone, Debug, FromJsonQueryResult)]
pub enum SystemTask { pub enum SystemTask {
OptimizeImage(OptimizeImageTask), OptimizeImage(OptimizeImageTask),
Test(crate::test_utils::task::TestSystemTask),
} }
} }
} }

View File

@ -294,3 +294,31 @@ impl TaskService {
} }
} }
} }
#[cfg(test)]
#[allow(unused_variables)]
mod tests {
use rstest::{fixture, rstest};
use tracing::Level;
use super::*;
use crate::test_utils::{app::TestingPreset, tracing::try_init_testing_tracing};
#[fixture]
fn before_each() {
try_init_testing_tracing(Level::DEBUG);
}
#[rstest]
#[tokio::test]
async fn test_cron_due_listening(before_each: ()) -> RecorderResult<()> {
let mut preset = TestingPreset::default().await?;
let app_ctx = preset.app_ctx.clone();
let db = app_ctx.db();
todo!();
Ok(())
}
}

View File

@ -5,11 +5,12 @@ use typed_builder::TypedBuilder;
use crate::{ use crate::{
app::AppContextTrait, app::AppContextTrait,
errors::RecorderResult,
test_utils::{ test_utils::{
crypto::build_testing_crypto_service, crypto::build_testing_crypto_service,
database::{TestingDatabaseServiceConfig, build_testing_database_service}, database::{TestingDatabaseServiceConfig, build_testing_database_service},
media::build_testing_media_service, media::build_testing_media_service,
mikan::build_testing_mikan_client, mikan::{MikanMockServer, build_testing_mikan_client},
storage::build_testing_storage_service, storage::build_testing_storage_service,
task::build_testing_task_service, task::build_testing_task_service,
}, },
@ -42,10 +43,8 @@ impl TestingAppContext {
self.task.get_or_init(|| task); self.task.get_or_init(|| task);
} }
pub async fn from_preset( pub async fn from_preset(preset: TestingAppContextPreset) -> RecorderResult<Arc<Self>> {
preset: TestingAppContextPreset, let mikan_client = build_testing_mikan_client(preset.mikan_base_url).await?;
) -> crate::errors::RecorderResult<Arc<Self>> {
let mikan_client = build_testing_mikan_client(preset.mikan_base_url.clone()).await?;
let db_service = let db_service =
build_testing_database_service(preset.database_config.unwrap_or_default()).await?; build_testing_database_service(preset.database_config.unwrap_or_default()).await?;
let crypto_service = build_testing_crypto_service().await?; let crypto_service = build_testing_crypto_service().await?;
@ -137,3 +136,28 @@ pub struct TestingAppContextPreset {
pub mikan_base_url: String, pub mikan_base_url: String,
pub database_config: Option<TestingDatabaseServiceConfig>, pub database_config: Option<TestingDatabaseServiceConfig>,
} }
#[derive(TypedBuilder)]
pub struct TestingPreset {
pub mikan_server: MikanMockServer,
pub app_ctx: Arc<dyn AppContextTrait>,
}
impl TestingPreset {
pub async fn default() -> RecorderResult<Self> {
let mikan_server = MikanMockServer::new().await?;
let database_config = TestingDatabaseServiceConfig::default();
let app_ctx = TestingAppContext::from_preset(TestingAppContextPreset {
mikan_base_url: mikan_server.base_url().to_string(),
database_config: Some(database_config),
})
.await?;
let preset = Self::builder()
.mikan_server(mikan_server)
.app_ctx(app_ctx)
.build();
Ok(preset)
}
}

View File

@ -3,6 +3,7 @@ use crate::{
errors::RecorderResult, errors::RecorderResult,
}; };
#[derive(Clone, Debug)]
pub struct TestingDatabaseServiceConfig { pub struct TestingDatabaseServiceConfig {
pub auto_migrate: bool, pub auto_migrate: bool,
} }

View File

@ -1,5 +1,6 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
fmt::Debug,
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
path::{self, PathBuf}, path::{self, PathBuf},
}; };
@ -148,13 +149,15 @@ impl AsRef<path::Path> for MikanDoppelPath {
} }
} }
#[cfg(any(test, debug_assertions, feature = "test-utils"))]
lazy_static! { lazy_static! {
static ref TEST_RESOURCES_DIR: String = static ref TEST_RESOURCES_DIR: String =
if cfg!(any(test, debug_assertions, feature = "playground")) { format!("{}/tests/resources", env!("CARGO_MANIFEST_DIR"));
format!("{}/tests/resources", env!("CARGO_MANIFEST_DIR")) }
} else {
"tests/resources".to_string() #[cfg(not(any(test, debug_assertions, feature = "test-utils")))]
}; lazy_static! {
static ref TEST_RESOURCES_DIR: String = "tests/resources".to_string();
} }
impl From<Url> for MikanDoppelPath { impl From<Url> for MikanDoppelPath {
@ -227,6 +230,14 @@ pub struct MikanMockServer {
base_url: Url, base_url: Url,
} }
impl Debug for MikanMockServer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MikanMockServer")
.field("base_url", &self.base_url)
.finish()
}
}
impl MikanMockServer { impl MikanMockServer {
pub async fn new_with_port(port: u16) -> RecorderResult<Self> { pub async fn new_with_port(port: u16) -> RecorderResult<Self> {
let server = mockito::Server::new_with_opts_async(mockito::ServerOpts { let server = mockito::Server::new_with_opts_async(mockito::ServerOpts {

View File

@ -1,15 +1,43 @@
use std::sync::Arc; use std::sync::Arc;
use chrono::Utc;
use crate::{ use crate::{
app::AppContextTrait, app::AppContextTrait,
errors::RecorderResult, errors::RecorderResult,
task::{TaskConfig, TaskService}, task::{AsyncTaskTrait, TaskConfig, TaskService, register_system_task_type},
}; };
register_system_task_type! {
#[derive(Debug, Clone, PartialEq)]
pub struct TestSystemTask {
pub task_id: String,
}
}
#[async_trait::async_trait]
impl AsyncTaskTrait for TestSystemTask {
async fn run_async(self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
let storage = ctx.storage();
storage
.write(
storage.build_test_path(self.task_id),
serde_json::json!({ "exec_time": Utc::now().timestamp_millis() })
.to_string()
.into(),
)
.await?;
Ok(())
}
}
pub async fn build_testing_task_service( pub async fn build_testing_task_service(
ctx: Arc<dyn AppContextTrait>, ctx: Arc<dyn AppContextTrait>,
) -> RecorderResult<TaskService> { ) -> RecorderResult<TaskService> {
let config = TaskConfig::default(); let config = TaskConfig::default();
let task_service = TaskService::from_config_and_ctx(config, ctx).await?; let task_service = TaskService::from_config_and_ctx(config, ctx).await?;
Ok(task_service) Ok(task_service)
} }

View File

@ -1,6 +1,10 @@
set windows-shell := ["pwsh.exe", "-c"] set windows-shell := ["pwsh.exe", "-c"]
set dotenv-load := true set dotenv-load := true
clean-cargo-incremental:
# https://github.com/rust-lang/rust/issues/141540
rm -r target/debug/incremental
prepare-dev: prepare-dev:
cargo install cargo-binstall cargo install cargo-binstall
cargo binstall sea-orm-cli cargo-llvm-cov cargo-nextest cargo binstall sea-orm-cli cargo-llvm-cov cargo-nextest

View File

@ -3,7 +3,10 @@
"version": "0.0.0", "version": "0.0.0",
"description": "Kono bangumi?", "description": "Kono bangumi?",
"license": "MIT", "license": "MIT",
"workspaces": ["packages/*", "apps/*"], "workspaces": [
"packages/*",
"apps/*"
],
"type": "module", "type": "module",
"repository": { "repository": {
"type": "git", "type": "git",
@ -20,17 +23,17 @@
"node": ">=22" "node": ">=22"
}, },
"dependencies": { "dependencies": {
"es-toolkit": "^1.39.3" "es-toolkit": "^1.39.6"
}, },
"devDependencies": { "devDependencies": {
"@biomejs/biome": "1.9.4", "@biomejs/biome": "1.9.4",
"@types/node": "^24.0.1", "@types/node": "^24.0.10",
"cross-env": "^7.0.3", "cross-env": "^7.0.3",
"kill-port": "^2.0.1", "kill-port": "^2.0.1",
"npm-run-all": "^4.1.5", "npm-run-all": "^4.1.5",
"tsx": "^4.20.2", "tsx": "^4.20.3",
"typescript": "^5.8.3", "typescript": "^5.8.3",
"ultracite": "^4.2.10" "ultracite": "^4.2.13"
}, },
"pnpm": { "pnpm": {
"overrides": { "overrides": {

552
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff