feat: add dal context and fill pull items

This commit is contained in:
master 2024-03-29 00:30:00 +08:00
parent 8a03dc28a2
commit 035d4e20dd
45 changed files with 1561 additions and 1061 deletions

2
.gitignore vendored
View File

@ -112,7 +112,7 @@ coverage
# nyc tests coverage
.nyc_output
# Grunt intermediate dal (https://gruntjs.com/creating-plugins#storing-task-files)
# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files)
.grunt
# Bower dependency directory (https://bower.io/)

837
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,9 @@
cargo-features = ["codegen-backend"]
[workspace]
members = ["crates/quirks_path", "crates/recorder"]
members = [
"crates/quirks_path",
"crates/recorder"
]
resolver = "2"
[profile.dev]

View File

@ -67,22 +67,6 @@ workers:
# - BackgroundAsync - Workers operate asynchronously in the background, processing tasks with async capabilities.
mode: BackgroundQueue
# Mailer Configuration.
mailer:
# SMTP mailer configuration.
smtp:
# Enable/Disable smtp mailer.
enable: true
# SMTP server host. e.x localhost, smtp.gmail.com
host: '{{ get_env(name="MAILER_HOST", default="localhost") }}'
# SMTP server port
port: 1025
# Use secure connection (SSL/TLS).
secure: false
# auth:
# user:
# password:
# Database Configuration
database:
# Database connection URI
@ -104,10 +88,3 @@ database:
# Recreating schema when application loaded. This is a dangerous operation, make sure that you using this flag only on dev environments or test mode
dangerously_recreate: false
# Redis Configuration
redis:
# Redis connection URI
uri: '{{ get_env(name="REDIS_URL", default="redis://127.0.0.1:6379") }}'
# Dangerously flush all data in Redis on startup. dangerous operation, make sure that you using this flag only on dev environments or test mode
dangerously_flush: false

View File

@ -62,27 +62,10 @@ workers:
# - BackgroundAsync - Workers operate asynchronously in the background, processing tasks with async capabilities.
mode: ForegroundBlocking
# Mailer Configuration.
mailer:
# SMTP mailer configuration.
smtp:
# Enable/Disable smtp mailer.
enable: true
# SMTP server host. e.x localhost, smtp.gmail.com
host: localhost
# SMTP server port
port: 1025
# Use secure connection (SSL/TLS).
secure: false
# auth:
# user:
# password:
stub: true
# Database Configuration
database:
# Database connection URI
uri: {{get_env(name="DATABASE_URL", default="postgres://loco:loco@localhost:5432/recorder_test")}}
uri: {{get_env(name="DATABASE_URL", default="postgres://loco:loco@localhost:5432/recorder_test")}}
# When enabled, the sql query will be logged.
enable_logging: false
# Set the timeout duration when acquiring a connection.
@ -107,12 +90,3 @@ redis:
# Dangerously flush all data in Redis on startup. dangerous operation, make sure that you using this flag only on dev environments or test mode
dangerously_flush: false
# Authentication Configuration
auth:
# JWT authentication
jwt:
# Secret key for token generation and verification
secret: ZknFYqXpnDgaWcKJZ5J5
# Token expiration time in seconds
expiration: 604800 # 7 days

View File

@ -18,30 +18,28 @@ default = []
testcontainers = []
[dependencies]
loco-rs = { version = "0.3.1" }
loco-rs = { version = "0.3.2" }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
eyre = "0.6"
tokio = { version = "1.33.0", default-features = false }
async-trait = "0.1.74"
tokio = { version = "1.36.0", default-features = false }
async-trait = "0.1.79"
tracing = "0.1.40"
chrono = "0.4"
validator = { version = "0.16" }
sea-orm = { version = "1.0.0-rc.1", features = [
validator = { version = "0.17" }
sea-orm = { version = "1.0.0-rc.3", features = [
"sqlx-sqlite",
"sqlx-postgres",
"runtime-tokio-rustls",
"macros",
] }
axum = "0.7.1"
include_dir = "0.7"
uuid = { version = "1.6.0", features = ["v4"] }
axum = "0.7.5"
uuid = { version = "1.8.0", features = ["v4"] }
tracing-subscriber = { version = "0.3.17", features = ["env-filter", "json"] }
sea-orm-migration = { version = "1.0.0-rc.1", features = [
sea-orm-migration = { version = "1.0.0-rc.3", features = [
"runtime-tokio-rustls",
] }
reqwest = "0.11.24"
reqwest = { version = "0.12.2", features = ["json"] }
thiserror = "1.0.57"
rss = "2.0.7"
bytes = "1.5.0"
@ -56,19 +54,19 @@ maplit = "1.0.2"
tl = { version = "0.7.8", features = ["simd"] }
lightningcss = "1.0.0-alpha.54"
html-escape = "0.2.13"
opendal = "0.45.0"
librqbit-core = "3.5.0"
opendal = "0.45.1"
librqbit-core = "3.6.1"
quirks_path = { path = "../quirks_path" }
tokio-utils = "0.1.2"
weak-table = "0.3.2"
oxilangtag = { version = "0.1.5", features = ["serde"] }
dateparser = "0.2.1"
dotenv = "0.15.0"
weak-table = "0.3.2"
[dev-dependencies]
serial_test = "2.0.0"
serial_test = "3.0.0"
rstest = "0.18.2"
loco-rs = { version = "0.3.1", features = ["testing"] }
insta = { version = "1.34.0", features = ["redactions", "yaml", "filters"] }
loco-rs = { version = "0.3.2", features = ["testing"] }
insta = { version = "1.3", features = ["redactions", "yaml", "filters"] }
testcontainers = { version = "0.15.0" }
testcontainers-modules = { version = "0.3.5" }
testcontainers-modules = { version = "0.3.6" }

View File

@ -2,7 +2,7 @@ use std::path::Path;
use async_trait::async_trait;
use loco_rs::{
app::{AppContext, Hooks},
app::{AppContext, Hooks, Initializer},
boot::{create_app, BootResult, StartMode},
controller::AppRoutes,
db::truncate_table,
@ -14,7 +14,7 @@ use loco_rs::{
use sea_orm::DatabaseConnection;
use crate::{
controllers, migrations::Migrator, models::entities::subscribers,
controllers, migrations::Migrator, models::entities::subscribers, storage::AppDalInitializer,
workers::subscription::SubscriptionWorker,
};
@ -60,4 +60,8 @@ impl Hooks for App {
async fn seed(_db: &DatabaseConnection, _base: &Path) -> Result<()> {
Ok(())
}
async fn initializers(_ctx: &AppContext) -> Result<Vec<Box<dyn Initializer>>> {
Ok(vec![Box::new(AppDalInitializer)])
}
}

View File

@ -4,7 +4,7 @@ use eyre::OptionExt;
use itertools::Itertools;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
pub const DAL_CONF_KEY: &str = "dal";
pub const DAL_CONF_KEY: &str = "storage";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AppCustomConf {

View File

@ -0,0 +1,76 @@
use axum::http::HeaderMap;
use bytes::Bytes;
use serde::de::DeserializeOwned;
use tokio_utils::RateLimiter;
use crate::downloaders::defs::DEFAULT_USER_AGENT;
pub struct ApiClient {
headers: HeaderMap,
rate_limiter: RateLimiter,
fetch_client: reqwest::Client,
}
impl ApiClient {
pub fn new(
throttle_duration: std::time::Duration,
override_headers: Option<HeaderMap>,
) -> eyre::Result<Self> {
Ok(Self {
headers: override_headers.unwrap_or_else(HeaderMap::new),
rate_limiter: RateLimiter::new(throttle_duration),
fetch_client: reqwest::Client::builder()
.user_agent(DEFAULT_USER_AGENT)
.build()?,
})
}
pub async fn fetch_json<R, F>(&self, f: F) -> Result<R, reqwest::Error>
where
F: FnOnce(&reqwest::Client) -> reqwest::RequestBuilder,
R: DeserializeOwned,
{
self.rate_limiter
.throttle(|| async {
f(&self.fetch_client)
.headers(self.headers.clone())
.send()
.await?
.json::<R>()
.await
})
.await
}
pub async fn fetch_bytes<F>(&self, f: F) -> Result<Bytes, reqwest::Error>
where
F: FnOnce(&reqwest::Client) -> reqwest::RequestBuilder,
{
self.rate_limiter
.throttle(|| async {
f(&self.fetch_client)
.headers(self.headers.clone())
.send()
.await?
.bytes()
.await
})
.await
}
pub async fn fetch_text<F>(&self, f: F) -> Result<String, reqwest::Error>
where
F: FnOnce(&reqwest::Client) -> reqwest::RequestBuilder,
{
self.rate_limiter
.throttle(|| async {
f(&self.fetch_client)
.headers(self.headers.clone())
.send()
.await?
.text()
.await
})
.await
}
}

View File

@ -1,23 +1,11 @@
use bytes::Bytes;
use itertools::Itertools;
use lazy_static::lazy_static;
use librqbit_core::{
magnet::Magnet,
torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Owned},
};
pub use qbit_rs::model::{
Torrent as QbitTorrent, TorrentContent as QbitTorrentContent,
TorrentFilter as QbitTorrentFilter, TorrentSource as QbitTorrentSource,
};
use regex::Regex;
use reqwest::{header::HeaderMap, IntoUrl};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tokio_utils::RateLimiter;
use url::Url;
use reqwest::IntoUrl;
use super::error::DownloaderError;
async fn download_bytes<T: IntoUrl>(url: T) -> eyre::Result<Bytes> {
pub(crate) async fn download_bytes<T: IntoUrl>(url: T) -> eyre::Result<Bytes> {
let request_client = reqwest::Client::builder()
.user_agent(DEFAULT_USER_AGENT)
.build()?;
@ -28,303 +16,3 @@ async fn download_bytes<T: IntoUrl>(url: T) -> eyre::Result<Bytes> {
pub const BITTORRENT_MIME_TYPE: &str = "application/x-bittorrent";
pub const MAGNET_SCHEMA: &str = "magnet";
pub const DEFAULT_USER_AGENT: &str = "Wget/1.13.4 (linux-gnu)";
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TorrentFilter {
All,
Downloading,
Completed,
Paused,
Active,
Inactive,
Resumed,
Stalled,
StalledUploading,
StalledDownloading,
Errored,
}
impl From<TorrentFilter> for QbitTorrentFilter {
fn from(val: TorrentFilter) -> Self {
match val {
TorrentFilter::All => QbitTorrentFilter::All,
TorrentFilter::Downloading => QbitTorrentFilter::Downloading,
TorrentFilter::Completed => QbitTorrentFilter::Completed,
TorrentFilter::Paused => QbitTorrentFilter::Paused,
TorrentFilter::Active => QbitTorrentFilter::Active,
TorrentFilter::Inactive => QbitTorrentFilter::Inactive,
TorrentFilter::Resumed => QbitTorrentFilter::Resumed,
TorrentFilter::Stalled => QbitTorrentFilter::Stalled,
TorrentFilter::StalledUploading => QbitTorrentFilter::StalledUploading,
TorrentFilter::StalledDownloading => QbitTorrentFilter::StalledDownloading,
TorrentFilter::Errored => QbitTorrentFilter::Errored,
}
}
}
lazy_static! {
static ref TORRENT_HASH_RE: Regex = Regex::new(r"[a-fA-F0-9]{40}").unwrap();
static ref TORRENT_EXT_RE: Regex = Regex::new(r"\.torrent$").unwrap();
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TorrentSource {
MagnetUrl {
url: Url,
hash: String,
},
TorrentUrl {
url: Url,
hash: String,
},
TorrentFile {
torrent: Vec<u8>,
hash: String,
name: Option<String>,
},
}
impl TorrentSource {
pub async fn parse(url: &str) -> eyre::Result<Self> {
let url = Url::parse(url)?;
let source = if url.scheme() == MAGNET_SCHEMA {
TorrentSource::from_magnet_url(url)?
} else if let Some(basename) = url
.clone()
.path_segments()
.and_then(|segments| segments.last())
{
if let (Some(match_hash), true) = (
TORRENT_HASH_RE.find(basename),
TORRENT_EXT_RE.is_match(basename),
) {
TorrentSource::from_torrent_url(url, match_hash.as_str().to_string())?
} else {
let contents = download_bytes(url).await?;
TorrentSource::from_torrent_file(contents.to_vec(), Some(basename.to_string()))?
}
} else {
let contents = download_bytes(url).await?;
TorrentSource::from_torrent_file(contents.to_vec(), None)?
};
Ok(source)
}
pub fn from_torrent_file(file: Vec<u8>, name: Option<String>) -> eyre::Result<Self> {
let torrent: TorrentMetaV1Owned =
torrent_from_bytes(&file).map_err(|_| DownloaderError::InvalidTorrentFileFormat)?;
let hash = torrent.info_hash.as_string();
Ok(TorrentSource::TorrentFile {
torrent: file,
hash,
name,
})
}
pub fn from_magnet_url(url: Url) -> eyre::Result<Self> {
if url.scheme() != MAGNET_SCHEMA {
Err(DownloaderError::InvalidUrlSchema {
found: url.scheme().to_string(),
expected: MAGNET_SCHEMA.to_string(),
}
.into())
} else {
let magnet =
Magnet::parse(url.as_str()).map_err(|_| DownloaderError::InvalidMagnetFormat {
url: url.as_str().to_string(),
})?;
let hash = magnet.info_hash.as_string();
Ok(TorrentSource::MagnetUrl { url, hash })
}
}
pub fn from_torrent_url(url: Url, hash: String) -> eyre::Result<Self> {
Ok(TorrentSource::TorrentUrl { url, hash })
}
pub fn hash(&self) -> &str {
match self {
TorrentSource::MagnetUrl { hash, .. } => hash,
TorrentSource::TorrentUrl { hash, .. } => hash,
TorrentSource::TorrentFile { hash, .. } => hash,
}
}
}
impl From<TorrentSource> for QbitTorrentSource {
fn from(value: TorrentSource) -> Self {
match value {
TorrentSource::MagnetUrl { url, .. } => QbitTorrentSource::Urls {
urls: qbit_rs::model::Sep::from([url]),
},
TorrentSource::TorrentUrl { url, .. } => QbitTorrentSource::Urls {
urls: qbit_rs::model::Sep::from([url]),
},
TorrentSource::TorrentFile {
torrent: torrents, ..
} => QbitTorrentSource::TorrentFiles { torrents },
}
}
}
pub trait TorrentContent {
fn get_name(&self) -> &str;
fn get_all_size(&self) -> u64;
fn get_progress(&self) -> f64;
fn get_curr_size(&self) -> u64;
}
impl TorrentContent for QbitTorrentContent {
fn get_name(&self) -> &str {
self.name.as_str()
}
fn get_all_size(&self) -> u64 {
self.size
}
fn get_progress(&self) -> f64 {
self.progress
}
fn get_curr_size(&self) -> u64 {
u64::clamp(
f64::round(self.get_all_size() as f64 * self.get_progress()) as u64,
0,
self.get_all_size(),
)
}
}
#[derive(Debug, Clone)]
pub enum Torrent {
Qbit {
torrent: QbitTorrent,
contents: Vec<QbitTorrentContent>,
},
}
impl Torrent {
pub fn iter_files(&self) -> impl Iterator<Item = &dyn TorrentContent> {
match self {
Torrent::Qbit { contents, .. } => {
contents.iter().map(|item| item as &dyn TorrentContent)
}
}
}
pub fn get_name(&self) -> Option<&str> {
match self {
Torrent::Qbit { torrent, .. } => torrent.name.as_deref(),
}
}
pub fn get_hash(&self) -> Option<&str> {
match self {
Torrent::Qbit { torrent, .. } => torrent.hash.as_deref(),
}
}
pub fn get_save_path(&self) -> Option<&str> {
match self {
Torrent::Qbit { torrent, .. } => torrent.save_path.as_deref(),
}
}
pub fn get_content_path(&self) -> Option<&str> {
match self {
Torrent::Qbit { torrent, .. } => torrent.content_path.as_deref(),
}
}
pub fn get_tags(&self) -> Vec<&str> {
match self {
Torrent::Qbit { torrent, .. } => torrent.tags.as_deref().map_or_else(Vec::new, |s| {
s.split(',')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect_vec()
}),
}
}
pub fn get_category(&self) -> Option<&str> {
match self {
Torrent::Qbit { torrent, .. } => torrent.category.as_deref(),
}
}
}
pub struct ApiClient {
headers: HeaderMap,
rate_limiter: RateLimiter,
fetch_client: reqwest::Client,
}
impl ApiClient {
pub fn new(
throttle_duration: std::time::Duration,
override_headers: Option<HeaderMap>,
) -> eyre::Result<Self> {
Ok(Self {
headers: override_headers.unwrap_or_else(HeaderMap::new),
rate_limiter: RateLimiter::new(throttle_duration),
fetch_client: reqwest::Client::builder()
.user_agent(DEFAULT_USER_AGENT)
.build()?,
})
}
pub async fn fetch_json<R, F>(&self, f: F) -> Result<R, reqwest::Error>
where
F: FnOnce(&reqwest::Client) -> reqwest::RequestBuilder,
R: DeserializeOwned,
{
self.rate_limiter
.throttle(|| async {
f(&self.fetch_client)
.headers(self.headers.clone())
.send()
.await?
.json::<R>()
.await
})
.await
}
pub async fn fetch_bytes<F>(&self, f: F) -> Result<Bytes, reqwest::Error>
where
F: FnOnce(&reqwest::Client) -> reqwest::RequestBuilder,
{
self.rate_limiter
.throttle(|| async {
f(&self.fetch_client)
.headers(self.headers.clone())
.send()
.await?
.bytes()
.await
})
.await
}
pub async fn fetch_text<F>(&self, f: F) -> Result<String, reqwest::Error>
where
F: FnOnce(&reqwest::Client) -> reqwest::RequestBuilder,
{
self.rate_limiter
.throttle(|| async {
f(&self.fetch_client)
.headers(self.headers.clone())
.send()
.await?
.text()
.await
})
.await
}
}

View File

@ -1,4 +1,7 @@
pub mod api_client;
pub mod defs;
pub mod error;
pub mod qbitorrent;
pub mod torrent_downloader;
pub mod torrent;
pub use api_client::ApiClient;

View File

@ -17,13 +17,12 @@ use quirks_path::{path_equals_as_file_url, Path, PathBuf};
use tokio::time::sleep;
use url::Url;
use super::{
defs::{Torrent, TorrentFilter, TorrentSource},
error::DownloaderError,
torrent_downloader::TorrentDownloader,
};
use super::error::DownloaderError;
use crate::{
downloaders::defs::{QbitTorrent, QbitTorrentContent, TorrentContent},
downloaders::{
defs::{QbitTorrent, QbitTorrentContent},
torrent::{Torrent, TorrentContent, TorrentDownloader, TorrentFilter, TorrentSource},
},
models::{entities::downloaders, prelude::DownloaderCategory},
};
@ -414,7 +413,7 @@ impl Debug for QBittorrentDownloader {
}
#[cfg(test)]
pub mod tests {
pub(crate) mod tests {
use itertools::Itertools;
use super::*;

View File

@ -0,0 +1,341 @@
use eyre::OptionExt;
use itertools::Itertools;
use lazy_static::lazy_static;
use librqbit_core::{
magnet::Magnet,
torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Owned},
};
use quirks_path::{Path, PathBuf};
use regex::Regex;
use sea_orm::{ActiveModelTrait, ActiveValue, DatabaseConnection, IntoActiveModel};
use serde::{Deserialize, Serialize};
use url::Url;
use super::{
defs::{
download_bytes, QbitTorrent, QbitTorrentContent, QbitTorrentFilter, QbitTorrentSource,
MAGNET_SCHEMA,
},
error::DownloaderError,
qbitorrent::QBittorrentDownloader,
};
use crate::{
models::{bangumi, downloaders, downloaders::DownloaderCategory, downloads},
path::torrent_path::gen_bangumi_sub_path,
};
lazy_static! {
static ref TORRENT_HASH_RE: Regex = Regex::new(r"[a-fA-F0-9]{40}").unwrap();
static ref TORRENT_EXT_RE: Regex = Regex::new(r"\.torrent$").unwrap();
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TorrentFilter {
All,
Downloading,
Completed,
Paused,
Active,
Inactive,
Resumed,
Stalled,
StalledUploading,
StalledDownloading,
Errored,
}
impl From<TorrentFilter> for QbitTorrentFilter {
fn from(val: TorrentFilter) -> Self {
match val {
TorrentFilter::All => QbitTorrentFilter::All,
TorrentFilter::Downloading => QbitTorrentFilter::Downloading,
TorrentFilter::Completed => QbitTorrentFilter::Completed,
TorrentFilter::Paused => QbitTorrentFilter::Paused,
TorrentFilter::Active => QbitTorrentFilter::Active,
TorrentFilter::Inactive => QbitTorrentFilter::Inactive,
TorrentFilter::Resumed => QbitTorrentFilter::Resumed,
TorrentFilter::Stalled => QbitTorrentFilter::Stalled,
TorrentFilter::StalledUploading => QbitTorrentFilter::StalledUploading,
TorrentFilter::StalledDownloading => QbitTorrentFilter::StalledDownloading,
TorrentFilter::Errored => QbitTorrentFilter::Errored,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TorrentSource {
MagnetUrl {
url: Url,
hash: String,
},
TorrentUrl {
url: Url,
hash: String,
},
TorrentFile {
torrent: Vec<u8>,
hash: String,
name: Option<String>,
},
}
impl TorrentSource {
pub async fn parse(url: &str) -> eyre::Result<Self> {
let url = Url::parse(url)?;
let source = if url.scheme() == MAGNET_SCHEMA {
TorrentSource::from_magnet_url(url)?
} else if let Some(basename) = url
.clone()
.path_segments()
.and_then(|segments| segments.last())
{
if let (Some(match_hash), true) = (
TORRENT_HASH_RE.find(basename),
TORRENT_EXT_RE.is_match(basename),
) {
TorrentSource::from_torrent_url(url, match_hash.as_str().to_string())?
} else {
let contents = download_bytes(url).await?;
TorrentSource::from_torrent_file(contents.to_vec(), Some(basename.to_string()))?
}
} else {
let contents = download_bytes(url).await?;
TorrentSource::from_torrent_file(contents.to_vec(), None)?
};
Ok(source)
}
pub fn from_torrent_file(file: Vec<u8>, name: Option<String>) -> eyre::Result<Self> {
let torrent: TorrentMetaV1Owned =
torrent_from_bytes(&file).map_err(|_| DownloaderError::InvalidTorrentFileFormat)?;
let hash = torrent.info_hash.as_string();
Ok(TorrentSource::TorrentFile {
torrent: file,
hash,
name,
})
}
pub fn from_magnet_url(url: Url) -> eyre::Result<Self> {
if url.scheme() != MAGNET_SCHEMA {
Err(DownloaderError::InvalidUrlSchema {
found: url.scheme().to_string(),
expected: MAGNET_SCHEMA.to_string(),
}
.into())
} else {
let magnet =
Magnet::parse(url.as_str()).map_err(|_| DownloaderError::InvalidMagnetFormat {
url: url.as_str().to_string(),
})?;
let hash = magnet
.as_id20()
.ok_or_eyre("no info hash found")?
.as_string();
Ok(TorrentSource::MagnetUrl { url, hash })
}
}
pub fn from_torrent_url(url: Url, hash: String) -> eyre::Result<Self> {
Ok(TorrentSource::TorrentUrl { url, hash })
}
pub fn hash(&self) -> &str {
match self {
TorrentSource::MagnetUrl { hash, .. } => hash,
TorrentSource::TorrentUrl { hash, .. } => hash,
TorrentSource::TorrentFile { hash, .. } => hash,
}
}
}
impl From<TorrentSource> for QbitTorrentSource {
fn from(value: TorrentSource) -> Self {
match value {
TorrentSource::MagnetUrl { url, .. } => QbitTorrentSource::Urls {
urls: qbit_rs::model::Sep::from([url]),
},
TorrentSource::TorrentUrl { url, .. } => QbitTorrentSource::Urls {
urls: qbit_rs::model::Sep::from([url]),
},
TorrentSource::TorrentFile {
torrent: torrents, ..
} => QbitTorrentSource::TorrentFiles { torrents },
}
}
}
pub trait TorrentContent {
fn get_name(&self) -> &str;
fn get_all_size(&self) -> u64;
fn get_progress(&self) -> f64;
fn get_curr_size(&self) -> u64;
}
impl TorrentContent for QbitTorrentContent {
fn get_name(&self) -> &str {
self.name.as_str()
}
fn get_all_size(&self) -> u64 {
self.size
}
fn get_progress(&self) -> f64 {
self.progress
}
fn get_curr_size(&self) -> u64 {
u64::clamp(
f64::round(self.get_all_size() as f64 * self.get_progress()) as u64,
0,
self.get_all_size(),
)
}
}
#[derive(Debug, Clone)]
pub enum Torrent {
Qbit {
torrent: QbitTorrent,
contents: Vec<QbitTorrentContent>,
},
}
impl Torrent {
pub fn iter_files(&self) -> impl Iterator<Item = &dyn TorrentContent> {
match self {
Torrent::Qbit { contents, .. } => {
contents.iter().map(|item| item as &dyn TorrentContent)
}
}
}
pub fn get_name(&self) -> Option<&str> {
match self {
Torrent::Qbit { torrent, .. } => torrent.name.as_deref(),
}
}
pub fn get_hash(&self) -> Option<&str> {
match self {
Torrent::Qbit { torrent, .. } => torrent.hash.as_deref(),
}
}
pub fn get_save_path(&self) -> Option<&str> {
match self {
Torrent::Qbit { torrent, .. } => torrent.save_path.as_deref(),
}
}
pub fn get_content_path(&self) -> Option<&str> {
match self {
Torrent::Qbit { torrent, .. } => torrent.content_path.as_deref(),
}
}
pub fn get_tags(&self) -> Vec<&str> {
match self {
Torrent::Qbit { torrent, .. } => torrent.tags.as_deref().map_or_else(Vec::new, |s| {
s.split(',')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect_vec()
}),
}
}
pub fn get_category(&self) -> Option<&str> {
match self {
Torrent::Qbit { torrent, .. } => torrent.category.as_deref(),
}
}
}
#[async_trait::async_trait]
pub trait TorrentDownloader {
async fn get_torrents_info(
&self,
status_filter: TorrentFilter,
category: Option<String>,
tag: Option<String>,
) -> eyre::Result<Vec<Torrent>>;
async fn add_torrents(
&self,
source: TorrentSource,
save_path: String,
category: Option<&str>,
) -> eyre::Result<()>;
async fn delete_torrents(&self, hashes: Vec<String>) -> eyre::Result<()>;
async fn rename_torrent_file(
&self,
hash: &str,
old_path: &str,
new_path: &str,
) -> eyre::Result<()>;
async fn move_torrents(&self, hashes: Vec<String>, new_path: &str) -> eyre::Result<()>;
async fn get_torrent_path(&self, hashes: String) -> eyre::Result<Option<String>>;
async fn check_connection(&self) -> eyre::Result<()>;
async fn set_torrents_category(&self, hashes: Vec<String>, category: &str) -> eyre::Result<()>;
async fn add_torrent_tags(&self, hashes: Vec<String>, tags: Vec<String>) -> eyre::Result<()>;
async fn add_category(&self, category: &str) -> eyre::Result<()>;
fn get_save_path(&self, sub_path: &Path) -> PathBuf;
async fn add_downloads_for_bangumi<'a, 'b>(
&self,
db: &'a DatabaseConnection,
downloads: &[&downloads::Model],
mut bangumi: bangumi::Model,
) -> eyre::Result<bangumi::Model> {
if bangumi.save_path.is_none() {
let gen_sub_path = gen_bangumi_sub_path(&bangumi);
let mut bangumi_active = bangumi.into_active_model();
bangumi_active.save_path = ActiveValue::Set(Some(gen_sub_path.to_string()));
bangumi = bangumi_active.update(db).await?;
}
let sub_path = bangumi
.save_path
.as_ref()
.unwrap_or_else(|| unreachable!("must have a sub path"));
let mut torrent_urls = vec![];
for m in downloads.iter() {
torrent_urls.push(Url::parse(&m.url as &str)?);
}
// make sequence to prevent too fast to be banned
for d in downloads.iter() {
let source = TorrentSource::parse(&d.url).await?;
self.add_torrents(source, sub_path.clone(), Some("bangumi"))
.await?;
}
Ok(bangumi)
}
}
pub async fn build_torrent_downloader_from_downloader_model(
model: downloaders::Model,
) -> eyre::Result<Box<dyn TorrentDownloader>> {
Ok(Box::new(match &model.category {
DownloaderCategory::QBittorrent => {
QBittorrentDownloader::from_downloader_model(model).await?
}
}))
}

View File

@ -1,96 +0,0 @@
use downloaders::DownloaderCategory;
use quirks_path::{Path, PathBuf};
use sea_orm::{ActiveModelTrait, ActiveValue, DatabaseConnection, IntoActiveModel};
use url::Url;
use super::{
defs::{Torrent, TorrentFilter, TorrentSource},
qbitorrent::QBittorrentDownloader,
};
use crate::{
models::{bangumi, downloaders, downloads},
path::torrent_path::gen_bangumi_sub_path,
};
#[async_trait::async_trait]
pub trait TorrentDownloader {
async fn get_torrents_info(
&self,
status_filter: TorrentFilter,
category: Option<String>,
tag: Option<String>,
) -> eyre::Result<Vec<Torrent>>;
async fn add_torrents(
&self,
source: TorrentSource,
save_path: String,
category: Option<&str>,
) -> eyre::Result<()>;
async fn delete_torrents(&self, hashes: Vec<String>) -> eyre::Result<()>;
async fn rename_torrent_file(
&self,
hash: &str,
old_path: &str,
new_path: &str,
) -> eyre::Result<()>;
async fn move_torrents(&self, hashes: Vec<String>, new_path: &str) -> eyre::Result<()>;
async fn get_torrent_path(&self, hashes: String) -> eyre::Result<Option<String>>;
async fn check_connection(&self) -> eyre::Result<()>;
async fn set_torrents_category(&self, hashes: Vec<String>, category: &str) -> eyre::Result<()>;
async fn add_torrent_tags(&self, hashes: Vec<String>, tags: Vec<String>) -> eyre::Result<()>;
async fn add_category(&self, category: &str) -> eyre::Result<()>;
fn get_save_path(&self, sub_path: &Path) -> PathBuf;
async fn add_downloads_for_bangumi<'a, 'b>(
&self,
db: &'a DatabaseConnection,
downloads: &[&downloads::Model],
mut bangumi: bangumi::Model,
) -> eyre::Result<bangumi::Model> {
if bangumi.save_path.is_none() {
let gen_sub_path = gen_bangumi_sub_path(&bangumi);
let mut bangumi_active = bangumi.into_active_model();
bangumi_active.save_path = ActiveValue::Set(Some(gen_sub_path.to_string()));
bangumi = bangumi_active.update(db).await?;
}
let sub_path = bangumi
.save_path
.as_ref()
.unwrap_or_else(|| unreachable!("must have a sub path"));
let mut torrent_urls = vec![];
for m in downloads.iter() {
torrent_urls.push(Url::parse(&m.url as &str)?);
}
// make sequence to prevent too fast to be banned
for d in downloads.iter() {
let source = TorrentSource::parse(&d.url).await?;
self.add_torrents(source, sub_path.clone(), Some("bangumi"))
.await?;
}
Ok(bangumi)
}
}
pub async fn build_torrent_downloader_from_downloader_model(
model: downloaders::Model,
) -> eyre::Result<Box<dyn TorrentDownloader>> {
Ok(Box::new(match &model.category {
DownloaderCategory::QBittorrent => {
QBittorrentDownloader::from_downloader_model(model).await?
}
}))
}

View File

@ -3,13 +3,13 @@
pub mod app;
pub mod config;
pub mod controllers;
pub mod dal;
pub mod downloaders;
pub mod migrations;
pub mod models;
pub mod parsers;
pub mod path;
pub mod search;
pub mod storage;
pub mod tasks;
pub mod views;
pub mod workers;

View File

@ -18,6 +18,7 @@ pub enum Subscribers {
Pid,
DisplayName,
DownloaderId,
BangumiConf,
}
#[derive(DeriveIden)]
@ -36,18 +37,42 @@ pub enum Subscriptions {
pub enum Bangumi {
Table,
Id,
DisplayName,
SubscriptionId,
DisplayName,
OfficialTitle,
Fansub,
Season,
Filter,
PosterLink,
SavePath,
LastEp,
BangumiConfOverride,
}
#[derive(DeriveIden)]
pub enum Episodes {
Table,
Id,
OriginTitle,
OfficialTitle,
DisplayName,
NameZh,
NameJp,
NameEn,
SNameZh,
SNameJp,
SNameEn,
BangumiId,
OutputName,
DownloadId,
SavePath,
Resolution,
Season,
SeasonRaw,
Fansub,
PosterLink,
HomePage,
Subtitle,
Source,
}
#[derive(DeriveIden)]
@ -55,13 +80,14 @@ pub enum Downloads {
Table,
Id,
SubscriptionId,
OriginalName,
OriginTitle,
DisplayName,
Status,
CurrSize,
AllSize,
Mime,
Url,
HomePage,
}
#[derive(DeriveIden)]
@ -73,7 +99,7 @@ pub enum Downloaders {
Password,
Username,
SubscriberId,
DownloadPath,
SavePath,
}
#[async_trait::async_trait]

View File

@ -1,3 +1,4 @@
use loco_rs::schema::jsonb_null;
use sea_orm_migration::{prelude::*, schema::*};
use super::defs::{
@ -20,6 +21,7 @@ impl MigrationTrait for Migration {
.col(pk_auto(Subscribers::Id))
.col(string_len_uniq(Subscribers::Pid, 64))
.col(string(Subscribers::DisplayName))
.col(jsonb_null(Subscribers::BangumiConf))
.to_owned(),
)
.await?;
@ -84,8 +86,16 @@ impl MigrationTrait for Migration {
.create_table(
table_auto(Bangumi::Table)
.col(pk_auto(Bangumi::Id))
.col(text(Bangumi::DisplayName))
.col(integer(Bangumi::SubscriptionId))
.col(text(Bangumi::DisplayName))
.col(text(Bangumi::OfficialTitle))
.col(string_null(Bangumi::Fansub))
.col(unsigned(Bangumi::Season))
.col(jsonb_null(Bangumi::Filter))
.col(text_null(Bangumi::PosterLink))
.col(text_null(Bangumi::SavePath))
.col(unsigned(Bangumi::LastEp))
.col(jsonb_null(Bangumi::BangumiConfOverride))
.foreign_key(
ForeignKey::create()
.name("fk_bangumi_subscription_id")
@ -94,6 +104,27 @@ impl MigrationTrait for Migration {
.on_update(ForeignKeyAction::Restrict)
.on_delete(ForeignKeyAction::Cascade),
)
.index(
Index::create()
.name("idx_bangumi_official_title")
.table(Bangumi::Table)
.col(Bangumi::OfficialTitle)
.unique(),
)
.index(
Index::create()
.name("idx_bangumi_fansub")
.table(Bangumi::Table)
.col(Bangumi::Fansub)
.unique(),
)
.index(
Index::create()
.name("idx_bangumi_display_name")
.table(Bangumi::Table)
.col(Bangumi::DisplayName)
.unique(),
)
.to_owned(),
)
.await?;
@ -106,9 +137,26 @@ impl MigrationTrait for Migration {
.create_table(
table_auto(Episodes::Table)
.col(pk_auto(Episodes::Id))
.col(text(Episodes::OriginTitle))
.col(text(Episodes::OfficialTitle))
.col(text(Episodes::DisplayName))
.col(text_null(Episodes::NameZh))
.col(text_null(Episodes::NameJp))
.col(text_null(Episodes::NameEn))
.col(text_null(Episodes::SNameZh))
.col(text_null(Episodes::SNameJp))
.col(text_null(Episodes::SNameEn))
.col(integer(Episodes::BangumiId))
.col(text(Episodes::OutputName))
.col(integer(Episodes::DownloadId))
.col(text_null(Episodes::SavePath))
.col(string_null(Episodes::Resolution))
.col(integer(Episodes::Season))
.col(string_null(Episodes::SeasonRaw))
.col(string_null(Episodes::Fansub))
.col(text_null(Episodes::PosterLink))
.col(text_null(Episodes::HomePage))
.col(jsonb_null(Episodes::Subtitle))
.col(text_null(Episodes::Source))
.foreign_key(
ForeignKey::create()
.name("fk_episode_bangumi_id")
@ -117,6 +165,24 @@ impl MigrationTrait for Migration {
.on_update(ForeignKeyAction::Restrict)
.on_delete(ForeignKeyAction::Cascade),
)
.index(
Index::create()
.name("idx_episode_official_title")
.table(Episodes::Table)
.col(Episodes::OfficialTitle),
)
.index(
Index::create()
.name("idx_episode_fansub")
.table(Episodes::Table)
.col(Episodes::Fansub),
)
.index(
Index::create()
.name("idx_episode_display_name")
.table(Episodes::Table)
.col(Episodes::DisplayName),
)
.to_owned(),
)
.await?;

View File

@ -38,8 +38,8 @@ impl MigrationTrait for Migration {
.create_table(
table_auto(Downloads::Table)
.col(pk_auto(Downloads::Id))
.col(string(Downloads::OriginalName))
.col(string(Downloads::DisplayName))
.col(text(Downloads::OriginTitle))
.col(text(Downloads::DisplayName))
.col(integer(Downloads::SubscriptionId))
.col(enumeration(
Downloads::Status,
@ -51,15 +51,10 @@ impl MigrationTrait for Migration {
DownloadMimeEnum,
DownloadMime::iden_values(),
))
.col(big_unsigned(Downloads::AllSize))
.col(big_unsigned(Downloads::CurrSize))
.col(big_unsigned_null(Downloads::AllSize))
.col(big_unsigned_null(Downloads::CurrSize))
.col(text(Downloads::Url))
.index(
Index::create()
.table(Downloads::Table)
.col(Downloads::Url)
.name("idx_download_url"),
)
.col(text_null(Downloads::HomePage))
.foreign_key(
ForeignKey::create()
.name("fk_download_subscription_id")
@ -68,6 +63,18 @@ impl MigrationTrait for Migration {
.on_update(ForeignKeyAction::Restrict)
.on_delete(ForeignKeyAction::Cascade),
)
.index(
Index::create()
.name("idx_download_url")
.table(Downloads::Table)
.col(Downloads::Url),
)
.index(
Index::create()
.name("idx_download_home_page")
.table(Downloads::Table)
.col(Downloads::HomePage),
)
.to_owned(),
)
.await?;

View File

@ -30,7 +30,7 @@ impl MigrationTrait for Migration {
DownloaderCategoryEnum,
DownloaderCategory::iden_values(),
))
.col(text(Downloaders::DownloadPath))
.col(text(Downloaders::SavePath))
.col(integer(Downloaders::SubscriberId))
.foreign_key(
ForeignKey::create()

View File

@ -1,8 +1,10 @@
use std::collections::HashSet;
use itertools::Itertools;
use regex::Regex;
use sea_orm::entity::prelude::*;
pub use super::entities::bangumi::*;
use crate::models::downloads;
#[async_trait::async_trait]
impl ActiveModelBehavior for ActiveModel {}
@ -26,6 +28,29 @@ impl BangumiFilter {
}
impl Model {
pub async fn search_all() {}
pub async fn match_list(dnlds: Vec<downloads::Model>) {}
pub fn get_unique_key(&self) -> BangumiUniqueKey {
BangumiUniqueKey {
official_title: self.official_title.clone(),
season: self.season,
fansub: self.fansub.clone(),
}
}
pub async fn find_by_unique_keys(
db: &DatabaseConnection,
unique_keys: impl Iterator<Item = &BangumiUniqueKey>,
) -> eyre::Result<Vec<Self>> {
let unique_keys = unique_keys.collect::<HashSet<_>>();
let mut found = Entity::find()
.filter(Column::OfficialTitle.is_in(unique_keys.iter().map(|k| &k.official_title)))
.all(db)
.await?;
found = found
.into_iter()
.filter(|m| unique_keys.contains(&m.get_unique_key()))
.collect_vec();
Ok(found)
}
}

View File

@ -1,5 +1,5 @@
use sea_orm::{
sea_query::{Expr, InsertStatement, IntoColumnRef, Query, SimpleExpr},
sea_query::{Expr, InsertStatement, Query, SimpleExpr},
ActiveModelTrait, ActiveValue, ColumnTrait, ConnectionTrait, EntityName, EntityTrait,
FromQueryResult, Iterable, SelectModel, SelectorRaw, TryGetable,
};

View File

@ -1,65 +1,25 @@
use itertools::Itertools;
use loco_rs::app::AppContext;
use sea_orm::{
prelude::*,
sea_query::{InsertStatement, OnConflict},
};
use sea_orm::{prelude::*, ActiveValue};
pub use crate::models::entities::downloads::*;
use crate::{
models::{
db_utils::insert_many_with_returning_all,
subscriptions::{self, SubscriptionCategory},
},
parsers::mikan::{
mikan_client::MikanClient, parse_mikan_rss_items_from_rss_link, MikanRssItem,
},
};
use crate::parsers::mikan::MikanRssItem;
#[async_trait::async_trait]
impl ActiveModelBehavior for ActiveModel {}
impl ActiveModel {
pub fn from_mikan_rss_item(m: MikanRssItem, subscription_id: i32) -> Self {
todo!()
}
}
impl Model {
pub async fn pull_subscription(
ctx: AppContext,
subscription: &subscriptions::Model,
) -> eyre::Result<Vec<Model>> {
let db = &ctx.db;
match &subscription.category {
SubscriptionCategory::Mikan => {
let subscriber_id = subscription.subscriber_id;
let client = MikanClient::new(subscriber_id).await?;
let items =
parse_mikan_rss_items_from_rss_link(&client, &subscription.source_url).await?;
let all_items = items.collect::<Vec<_>>();
if all_items.is_empty() {
return Ok(vec![]);
}
let new_items = all_items
.into_iter()
.map(|i| ActiveModel::from_mikan_rss_item(i, subscription.id))
.collect_vec();
// insert and filter out duplicated items
let new_items: Vec<Model> =
insert_many_with_returning_all(db, new_items, |stat: &mut InsertStatement| {
stat.on_conflict(OnConflict::column(Column::Url).do_nothing().to_owned());
})
.await?;
Ok(new_items)
}
_ => {
todo!("other subscription categories")
}
pub fn from_mikan_rss_item(rss_item: MikanRssItem, subscription_id: i32) -> Self {
let download_mime = rss_item.get_download_mime();
Self {
origin_title: ActiveValue::Set(rss_item.title.clone()),
display_name: ActiveValue::Set(rss_item.title),
subscription_id: ActiveValue::Set(subscription_id),
status: ActiveValue::Set(DownloadStatus::Pending),
mime: ActiveValue::Set(download_mime),
url: ActiveValue::Set(rss_item.url),
all_size: ActiveValue::Set(rss_item.content_length),
curr_size: ActiveValue::Set(Some(0)),
homepage: ActiveValue::Set(rss_item.homepage),
..Default::default()
}
}
}

View File

@ -37,7 +37,7 @@ pub enum BangumiRenameMethod {
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, FromJsonQueryResult)]
pub struct SubscriberBangumiConfigOverride {
pub struct SubscribeBangumiConfigOverride {
pub leading_fansub_tag: Option<bool>,
pub complete_history_episodes: Option<bool>,
pub rename_method: Option<BangumiRenameMethod>,
@ -50,6 +50,13 @@ pub struct BangumiFilter {
pub regex_filters: Option<Vec<String>>,
}
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct BangumiUniqueKey {
pub official_title: String,
pub season: u32,
pub fansub: Option<String>,
}
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "bangumi")]
pub struct Model {
@ -60,15 +67,13 @@ pub struct Model {
pub subscription_id: i32,
pub display_name: String,
pub official_title: String,
pub season: i32,
pub season_raw: Option<String>,
pub fansub: Option<String>,
pub season: u32,
pub filter: Option<BangumiFilter>,
pub rss_link: Option<String>,
pub poster_link: Option<String>,
pub save_path: Option<String>,
pub deleted: bool,
pub subscriber_conf_override: Option<SubscriberBangumiConfigOverride>,
pub last_ep: u32,
pub bangumi_conf_override: Option<SubscribeBangumiConfigOverride>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@ -18,9 +18,7 @@ pub enum DownloaderCategory {
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "downloaders")]
pub struct Model {
#[sea_orm(column_type = "Timestamp")]
pub created_at: DateTime,
#[sea_orm(column_type = "Timestamp")]
pub updated_at: DateTime,
#[sea_orm(primary_key)]
pub id: i32,

View File

@ -1,4 +1,4 @@
use sea_orm::{entity::prelude::*, FromJsonQueryResult};
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(
@ -41,7 +41,7 @@ pub struct Model {
pub updated_at: DateTime,
#[sea_orm(primary_key)]
pub id: i32,
pub origin_name: String,
pub origin_title: String,
pub display_name: String,
pub subscription_id: i32,
pub status: DownloadStatus,

View File

@ -10,7 +10,7 @@ pub struct Model {
pub updated_at: DateTime,
#[sea_orm(primary_key)]
pub id: i32,
pub raw_name: String,
pub origin_title: String,
pub official_title: String,
pub display_name: String,
pub name_zh: Option<String>,
@ -20,16 +20,15 @@ pub struct Model {
pub s_name_jp: Option<String>,
pub s_name_en: Option<String>,
pub bangumi_id: i32,
pub download_id: i32,
pub save_path: String,
pub download_id: Option<i32>,
pub save_path: Option<String>,
pub resolution: Option<String>,
pub season: i32,
pub season: u32,
pub season_raw: Option<String>,
pub fansub: Option<String>,
pub poster_link: Option<String>,
pub home_page: Option<String>,
pub subtitle: Option<Vec<String>>,
pub deleted: bool,
pub source: Option<String>,
}

View File

@ -6,14 +6,14 @@ use serde::{Deserialize, Serialize};
use super::bangumi::BangumiRenameMethod;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, FromJsonQueryResult)]
pub struct SubscriberBangumiConfig {
pub struct SubscribeBangumiConfig {
pub leading_fansub_tag: bool,
pub complete_history_episodes: bool,
pub rename_method: BangumiRenameMethod,
pub remove_bad_torrent: bool,
}
impl Default for SubscriberBangumiConfig {
impl Default for SubscribeBangumiConfig {
fn default() -> Self {
Self {
leading_fansub_tag: false,
@ -35,7 +35,7 @@ pub struct Model {
pub pid: String,
pub display_name: String,
pub downloader_id: Option<i32>,
pub bangumi_conf: SubscriberBangumiConfig,
pub bangumi_conf: SubscribeBangumiConfig,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@ -1,37 +1,43 @@
use sea_orm::{entity::prelude::*, ActiveValue};
pub use super::entities::episodes::*;
use crate::models::{bangumi, downloads};
use crate::{
models::downloads,
parsers::{mikan::MikanEpisodeMeta, raw::RawEpisodeMeta},
};
#[async_trait::async_trait]
impl ActiveModelBehavior for ActiveModel {}
impl ActiveModel {
pub async fn from_mikan_rss_item(dl: &downloads::Model, bgm: &bangumi::Model) -> Self {
let _ = Self {
raw_name: ActiveValue::Set(dl.origin_name.clone()),
official_title: ActiveValue::Set(bgm.official_title.clone()),
display_name: ActiveValue::Set(bgm.display_name.clone()),
name_zh: Default::default(),
name_jp: Default::default(),
name_en: Default::default(),
s_name_zh: Default::default(),
s_name_jp: Default::default(),
s_name_en: Default::default(),
bangumi_id: Default::default(),
download_id: Default::default(),
save_path: Default::default(),
resolution: Default::default(),
season: Default::default(),
season_raw: Default::default(),
fansub: Default::default(),
poster_link: Default::default(),
home_page: Default::default(),
subtitle: Default::default(),
deleted: Default::default(),
source: Default::default(),
pub fn from_mikan_meta(
bangumi_id: i32,
dl: downloads::Model,
raw_meta: RawEpisodeMeta,
mikan_meta: MikanEpisodeMeta,
mikan_poster: Option<String>,
) -> Self {
Self {
origin_title: ActiveValue::Set(dl.origin_title),
official_title: ActiveValue::Set(mikan_meta.official_title.clone()),
display_name: ActiveValue::Set(mikan_meta.official_title),
name_zh: ActiveValue::Set(raw_meta.name_zh),
name_jp: ActiveValue::Set(raw_meta.name_jp),
name_en: ActiveValue::Set(raw_meta.name_en),
s_name_zh: ActiveValue::Set(raw_meta.s_name_zh),
s_name_jp: ActiveValue::Set(raw_meta.s_name_jp),
s_name_en: ActiveValue::Set(raw_meta.s_name_en),
bangumi_id: ActiveValue::Set(bangumi_id),
download_id: ActiveValue::Set(Some(dl.id)),
resolution: ActiveValue::Set(raw_meta.resolution),
season: ActiveValue::Set(raw_meta.season),
season_raw: ActiveValue::Set(raw_meta.season_raw),
fansub: ActiveValue::Set(raw_meta.fansub),
poster_link: ActiveValue::Set(mikan_poster),
home_page: ActiveValue::Set(dl.homepage),
subtitle: ActiveValue::Set(raw_meta.sub),
source: ActiveValue::Set(raw_meta.source),
..Default::default()
};
todo!()
}
}
}

View File

@ -1,7 +1,28 @@
use sea_orm::{entity::prelude::*, ActiveValue};
use std::collections::HashMap;
use itertools::Itertools;
use loco_rs::app::AppContext;
use sea_orm::{
entity::prelude::*,
sea_query::{InsertStatement, OnConflict},
ActiveValue,
};
use serde::{Deserialize, Serialize};
use tracing::{event, instrument, Level};
pub use super::entities::subscriptions::{self, *};
use crate::{
models::{bangumi, db_utils::insert_many_with_returning_all, downloads, episodes},
parsers::{
mikan::{
parse_episode_meta_from_mikan_homepage, parse_mikan_rss_items_from_rss_link,
MikanClient, MikanEpisodeMeta,
},
raw::{parse_episode_meta_from_raw_name, RawEpisodeMeta},
},
path::extract_extname_from_url,
storage::{AppContextDalExt, DalContentType},
};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SubscriptionCreateFromRssDto {
@ -80,4 +101,190 @@ impl Model {
.await?;
Ok(())
}
#[instrument(
fields(subscriber_id = "self.subscriber_id", subscription_id = "self.id"),
skip(self, db, ctx)
)]
pub async fn pull_item(&self, db: &DatabaseConnection, ctx: &AppContext) -> eyre::Result<()> {
let subscription = self;
let subscription_id = subscription.id;
match &subscription.category {
SubscriptionCategory::Mikan => {
let subscriber_id = subscription.subscriber_id;
let mikan_client = MikanClient::new(subscriber_id).await?;
let mikan_rss_items =
parse_mikan_rss_items_from_rss_link(&mikan_client, &subscription.source_url)
.await?;
let all_items = mikan_rss_items.collect::<Vec<_>>();
if all_items.is_empty() {
return Ok(());
}
let new_downloads = all_items
.into_iter()
.map(|rss_item| {
downloads::ActiveModel::from_mikan_rss_item(rss_item, subscription.id)
})
.collect_vec();
// insert and filter out duplicated items
let new_downloads: Vec<downloads::Model> = insert_many_with_returning_all(
db,
new_downloads,
|stat: &mut InsertStatement| {
stat.on_conflict(
OnConflict::column(downloads::Column::Url)
.do_nothing()
.to_owned(),
);
},
)
.await?;
pub struct MikanEpMetaBundle {
pub download: downloads::Model,
pub mikan: MikanEpisodeMeta,
pub raw: RawEpisodeMeta,
pub poster: Option<String>,
}
let mut ep_metas: HashMap<bangumi::BangumiUniqueKey, Vec<MikanEpMetaBundle>> =
HashMap::new();
let dal = ctx.get_dal_unwrap().await;
{
for dl in new_downloads {
let mut mikan_meta = if let Some(homepage) = dl.homepage.as_deref() {
match parse_episode_meta_from_mikan_homepage(&mikan_client, homepage)
.await
{
Ok(mikan_meta) => mikan_meta,
Err(e) => {
let error: &dyn std::error::Error = e.as_ref();
event!(
Level::ERROR,
desc = "failed to parse episode meta from mikan homepage",
homepage = homepage,
error = error
);
continue;
}
}
} else {
continue;
};
let mikan_poster_link = if let Some(poster) = mikan_meta.poster.take() {
if let Some(extname) = extract_extname_from_url(&poster.origin_url) {
let result = dal
.store_blob(
DalContentType::Poster,
&extname,
poster.data,
&subscriber_id.to_string(),
)
.await;
match result {
Ok(stored_url) => Some(stored_url.to_string()),
Err(e) => {
let error: &dyn std::error::Error = e.as_ref();
event!(
Level::ERROR,
desc = "failed to store mikan meta poster",
origin_url = poster.origin_url.as_str(),
error = error
);
None
}
}
} else {
event!(
Level::ERROR,
desc = "failed to extract mikan meta poster extname",
origin_url = poster.origin_url.as_str(),
);
None
}
} else {
None
};
let raw_meta = match parse_episode_meta_from_raw_name(&dl.origin_title) {
Ok(raw_meta) => raw_meta,
Err(e) => {
let error: &dyn std::error::Error = e.as_ref();
event!(
Level::ERROR,
desc = "failed to parse episode meta from origin name",
origin_name = &dl.origin_title,
error = error
);
continue;
}
};
let key = bangumi::BangumiUniqueKey {
official_title: mikan_meta.official_title.clone(),
season: raw_meta.season,
fansub: raw_meta.fansub.clone(),
};
let meta = MikanEpMetaBundle {
download: dl,
mikan: mikan_meta,
raw: raw_meta,
poster: mikan_poster_link,
};
ep_metas.entry(key).or_default().push(meta);
}
}
for (_, eps) in ep_metas {
let meta = eps.first().unwrap_or_else(|| {
unreachable!(
"subscriptions pull items bangumi must have at least one episode meta"
)
});
let last_ep = eps.iter().fold(0, |acc, ep| acc.max(ep.raw.episode_index));
let official_title = &meta.mikan.official_title;
let bgm = bangumi::ActiveModel {
subscription_id: ActiveValue::Set(subscription_id),
display_name: ActiveValue::Set(official_title.clone()),
official_title: ActiveValue::Set(official_title.clone()),
fansub: ActiveValue::Set(meta.raw.fansub.clone()),
season: ActiveValue::Set(meta.raw.season),
poster_link: ActiveValue::Set(meta.poster.clone()),
last_ep: ActiveValue::Set(last_ep),
..Default::default()
};
let bgm = bangumi::Entity::insert(bgm)
.on_conflict(
OnConflict::columns([
bangumi::Column::OfficialTitle,
bangumi::Column::Season,
bangumi::Column::Fansub,
])
.update_columns([bangumi::Column::LastEp])
.to_owned(),
)
.exec_with_returning(db)
.await?;
let eps = eps.into_iter().map(|ep| {
episodes::ActiveModel::from_mikan_meta(
bgm.id,
ep.download,
ep.raw,
ep.mikan,
ep.poster,
)
});
episodes::Entity::insert_many(eps).exec(db).await?;
}
Ok(())
}
_ => {
todo!("other subscription categories")
}
}
}
}

View File

@ -16,4 +16,6 @@ pub enum ParseError {
LanguageTagError(#[from] oxilangtag::LanguageTagParseError),
#[error("Unsupported language preset: {0}")]
UnsupportedLanguagePreset(String),
#[error("Parse episode meta error, get empty official title, homepage = {0}")]
MikanEpisodeMetaEmptyOfficialTitleError(String),
}

View File

@ -2,7 +2,7 @@ use std::{ops::Deref, sync::Arc};
use tokio::sync::OnceCell;
use crate::downloaders::defs::ApiClient;
use crate::downloaders::ApiClient;
pub struct MikanClient {
api_client: ApiClient,

View File

@ -3,17 +3,26 @@ use html_escape::decode_html_entities;
use lazy_static::lazy_static;
use lightningcss::{properties::Property, values::image::Image};
use regex::Regex;
use reqwest::IntoUrl;
use tracing::instrument;
use url::Url;
use crate::parsers::{
errors::ParseError,
html::{get_tag_style, query_selector_first_tag},
mikan::mikan_client::MikanClient,
};
#[derive(Clone, Debug)]
pub struct MikanEpisodeMetaPosterBlob {
pub origin_url: Url,
pub data: Bytes,
}
#[derive(Clone, Debug)]
pub struct MikanEpisodeMeta {
pub homepage: Url,
pub poster_data: Option<Bytes>,
pub origin_poster_src: Option<Url>,
pub poster: Option<MikanEpisodeMetaPosterBlob>,
pub official_title: String,
}
@ -21,12 +30,14 @@ lazy_static! {
static ref MIKAN_TITLE_SEASON: Regex = Regex::new("第.*季").unwrap();
}
#[instrument(skip(client, url))]
pub async fn parse_episode_meta_from_mikan_homepage(
client: &MikanClient,
url: Url,
) -> eyre::Result<Option<MikanEpisodeMeta>> {
url: impl IntoUrl,
) -> eyre::Result<MikanEpisodeMeta> {
let url = url.into_url()?;
let url_host = url.origin().unicode_serialization();
let content = client.fetch_text(|f| f.get(url.as_str())).await?;
let content = client.fetch_text(|f| f.get(url.clone())).await?;
let dom = tl::parse(&content, tl::ParserOptions::default())?;
let parser = dom.parser();
let poster_node = query_selector_first_tag(&dom, r"div.bangumi-poster", parser);
@ -62,12 +73,19 @@ pub async fn parse_episode_meta_from_mikan_homepage(
p.set_query(None);
p
});
let poster_data = if let Some(p) = origin_poster_src.as_ref() {
client.fetch_bytes(|f| f.get(p.clone())).await.ok()
let poster = if let Some(p) = origin_poster_src {
client
.fetch_bytes(|f| f.get(p.clone()))
.await
.ok()
.map(|data| MikanEpisodeMetaPosterBlob {
data,
origin_url: p,
})
} else {
None
};
let meta = official_title_node
let official_title = official_title_node
.map(|s| s.inner_text(parser))
.and_then(|official_title| {
let title = MIKAN_TITLE_SEASON
@ -80,13 +98,13 @@ pub async fn parse_episode_meta_from_mikan_homepage(
Some(title)
}
})
.map(|title| MikanEpisodeMeta {
homepage: url,
poster_data,
official_title: title,
origin_poster_src,
});
Ok(meta)
.ok_or_else(|| ParseError::MikanEpisodeMetaEmptyOfficialTitleError(url.to_string()))?;
Ok(MikanEpisodeMeta {
homepage: url,
poster,
official_title,
})
}
#[cfg(test)]
@ -105,24 +123,25 @@ mod test {
let client = MikanClient::new(0).await.expect("should get mikan client");
if let Some(ep_meta) =
parse_episode_meta_from_mikan_homepage(&client, url.clone()).await?
let ep_meta = parse_episode_meta_from_mikan_homepage(&client, url.clone()).await?;
{
assert_eq!(ep_meta.homepage, url);
assert_eq!(ep_meta.official_title, "葬送的芙莉莲");
assert_eq!(
ep_meta.origin_poster_src,
ep_meta.poster.clone().map(|p| p.origin_url),
Some(Url::parse(
"https://mikanani.me/images/Bangumi/202309/5ce9fed1.jpg"
)?)
);
let u8_data = ep_meta.poster_data.expect("should have poster data");
let u8_data = ep_meta
.poster
.clone()
.map(|p| p.data)
.expect("should have poster data");
assert!(
u8_data.starts_with(&[255, 216, 255, 224]),
"should start with valid jpeg data magic number"
);
} else {
panic!("can not find mikan episode title")
}
Ok(())

View File

@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize};
use crate::{
downloaders::defs::BITTORRENT_MIME_TYPE,
models::prelude::DownloadMime,
parsers::{errors::ParseError, mikan::mikan_client::MikanClient},
};
@ -17,6 +18,12 @@ pub struct MikanRssItem {
pub pub_date: Option<i64>,
}
impl MikanRssItem {
pub fn get_download_mime(&self) -> DownloadMime {
DownloadMime::BitTorrent
}
}
impl TryFrom<rss::Item> for MikanRssItem {
type Error = ParseError;

View File

@ -2,5 +2,6 @@ pub mod mikan_client;
pub mod mikan_ep_parser;
pub mod mikan_rss_parser;
pub use mikan_client::MikanClient;
pub use mikan_ep_parser::{parse_episode_meta_from_mikan_homepage, MikanEpisodeMeta};
pub use mikan_rss_parser::{parse_mikan_rss_items_from_rss_link, MikanRssItem};

View File

@ -43,19 +43,19 @@ lazy_static! {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RawEpisodeMeta {
name_en: Option<String>,
name_en_no_season: Option<String>,
name_jp: Option<String>,
name_jp_no_season: Option<String>,
name_zh: Option<String>,
name_zh_no_season: Option<String>,
season: i32,
season_raw: Option<String>,
episode_index: i32,
sub: Option<String>,
source: Option<String>,
fansub: Option<String>,
resolution: Option<String>,
pub name_en: Option<String>,
pub s_name_en: Option<String>,
pub name_jp: Option<String>,
pub s_name_jp: Option<String>,
pub name_zh: Option<String>,
pub s_name_zh: Option<String>,
pub season: u32,
pub season_raw: Option<String>,
pub episode_index: u32,
pub sub: Option<Vec<String>>,
pub source: Option<String>,
pub fansub: Option<String>,
pub resolution: Option<String>,
}
fn extract_fansub(raw_name: &str) -> Option<&str> {
@ -110,7 +110,7 @@ fn title_body_pre_process(title_body: &str, fansub: Option<&str>) -> eyre::Resul
Ok(raw.to_string())
}
fn extract_season_from_title_body(title_body: &str) -> (String, Option<String>, i32) {
fn extract_season_from_title_body(title_body: &str) -> (String, Option<String>, u32) {
let name_and_season = EN_BRACKET_SPLIT_RE.replace_all(title_body, " ");
let seasons = SEASON_EXTRACT_SEASON_ALL_RE
.find(&name_and_season)
@ -122,7 +122,7 @@ fn extract_season_from_title_body(title_body: &str) -> (String, Option<String>,
return (title_body.to_string(), None, 1);
}
let mut season = 1;
let mut season = 1u32;
let mut season_raw = None;
let name = SEASON_EXTRACT_SEASON_ALL_RE.replace_all(&name_and_season, "");
@ -131,7 +131,7 @@ fn extract_season_from_title_body(title_body: &str) -> (String, Option<String>,
if let Some(m) = SEASON_EXTRACT_SEASON_EN_PREFIX_RE.find(s) {
if let Ok(s) = SEASON_EXTRACT_SEASON_ALL_RE
.replace_all(m.as_str(), "")
.parse::<i32>()
.parse::<u32>()
{
season = s;
break;
@ -140,7 +140,7 @@ fn extract_season_from_title_body(title_body: &str) -> (String, Option<String>,
if let Some(m) = SEASON_EXTRACT_SEASON_EN_NTH_RE.find(s) {
if let Some(s) = DIGIT_1PLUS_REG
.find(m.as_str())
.and_then(|s| s.as_str().parse::<i32>().ok())
.and_then(|s| s.as_str().parse::<u32>().ok())
{
season = s;
break;
@ -149,13 +149,13 @@ fn extract_season_from_title_body(title_body: &str) -> (String, Option<String>,
if let Some(m) = SEASON_EXTRACT_SEASON_ZH_PREFIX_RE.find(s) {
if let Ok(s) = SEASON_EXTRACT_SEASON_ZH_PREFIX_SUB_RE
.replace(m.as_str(), "")
.parse::<i32>()
.parse::<u32>()
{
season = s;
break;
}
if let Some(m) = ZH_NUM_RE.find(m.as_str()) {
season = ZH_NUM_MAP[m.as_str()];
season = ZH_NUM_MAP[m.as_str()] as u32;
break;
}
}
@ -207,21 +207,25 @@ fn extract_name_from_title_body_name_section(
(name_en, name_zh, name_jp)
}
fn extract_episode_index_from_title_episode(title_episode: &str) -> Option<i32> {
fn extract_episode_index_from_title_episode(title_episode: &str) -> Option<u32> {
DIGIT_1PLUS_REG
.find(title_episode)?
.as_str()
.parse::<i32>()
.parse::<u32>()
.ok()
}
fn clear_sub(sub: Option<String>) -> Option<String> {
sub.map(|s| CLEAR_SUB_RE.replace_all(&s, "").to_string())
fn clear_sub(sub: Option<Vec<String>>) -> Option<Vec<String>> {
sub.map(|s| {
s.into_iter()
.map(|s| CLEAR_SUB_RE.replace_all(&s, "").to_string())
.collect_vec()
})
}
fn extract_tags_from_title_extra(
title_extra: &str,
) -> (Option<String>, Option<String>, Option<String>) {
) -> (Option<Vec<String>>, Option<String>, Option<String>) {
let replaced = TAGS_EXTRACT_SPLIT_RE.replace_all(title_extra, " ");
let elements = replaced
.split(' ')
@ -229,12 +233,19 @@ fn extract_tags_from_title_extra(
.filter(|s| !s.is_empty())
.collect_vec();
let mut sub = None;
let mut sub: Option<Vec<String>> = None;
let mut resolution = None;
let mut source = None;
for element in elements.iter() {
if SUB_RE.is_match(element) {
sub = Some(element.to_string())
let el = element.to_string();
sub = Some(match sub {
Some(mut res) => {
res.push(el);
res
}
None => vec![el],
})
} else if RESOLUTION_RE.is_match(element) {
resolution = Some(element.to_string())
} else if SOURCE_L1_RE.is_match(element) {
@ -292,11 +303,11 @@ pub fn parse_episode_meta_from_raw_name(s: &str) -> eyre::Result<RawEpisodeMeta>
let (sub, resolution, source) = extract_tags_from_title_extra(title_extra);
Ok(RawEpisodeMeta {
name_en,
name_en_no_season,
s_name_en: name_en_no_season,
name_jp,
name_jp_no_season,
s_name_jp: name_jp_no_season,
name_zh,
name_zh_no_season,
s_name_zh: name_zh_no_season,
season,
season_raw,
episode_index,

View File

@ -8,7 +8,7 @@ use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, AUTHORIZATION};
use tokio::sync::RwLock;
use weak_table::WeakValueHashMap;
use crate::downloaders::defs::{ApiClient, DEFAULT_USER_AGENT};
use crate::downloaders::ApiClient;
pub(crate) const TMDB_API_ORIGIN: &str = "https://api.themoviedb.org";

View File

@ -1 +1,4 @@
pub mod torrent_path;
pub mod url_utils;
pub use url_utils::{extract_extname_from_url, extract_filename_from_url};

View File

@ -3,7 +3,7 @@ use std::collections::HashSet;
use quirks_path::{Path, PathBuf};
use crate::{
downloaders::defs::Torrent,
downloaders::torrent::Torrent,
models::{bangumi, subscribers},
parsers::defs::SEASON_REGEX,
};
@ -70,7 +70,7 @@ pub fn gen_bangumi_sub_path(data: &bangumi::Model) -> PathBuf {
PathBuf::from(data.official_title.to_string()).join(format!("Season {}", data.season))
}
pub fn rule_name(bgm: &bangumi::Model, conf: &subscribers::SubscriberBangumiConfig) -> String {
pub fn rule_name(bgm: &bangumi::Model, conf: &subscribers::SubscribeBangumiConfig) -> String {
if let (true, Some(group_name)) = (conf.leading_fansub_tag, &bgm.fansub) {
format!("[{}] {} S{}", group_name, bgm.official_title, bgm.season)
} else {

View File

@ -0,0 +1,19 @@
use quirks_path::Path;
use url::Url;
pub fn extract_filename_from_url(url: &Url) -> Option<&str> {
url.path_segments().and_then(|s| s.last()).and_then(|last| {
if last.is_empty() {
Some(last)
} else {
None
}
})
}
pub fn extract_extname_from_url(url: &Url) -> Option<String> {
let filename = extract_filename_from_url(url);
filename
.and_then(|f| Path::new(f).extension())
.map(|ext| format!(".{}", ext))
}

View File

@ -1,3 +1,5 @@
use std::fmt::Display;
use bytes::Bytes;
use opendal::{layers::LoggingLayer, services, Operator};
use quirks_path::{Path, PathBuf};
@ -9,11 +11,11 @@ use crate::config::AppDalConf;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AppDalContentCategory {
pub enum DalContentType {
Poster,
}
impl AsRef<str> for AppDalContentCategory {
impl AsRef<str> for DalContentType {
fn as_ref(&self) -> &str {
match self {
Self::Poster => "poster",
@ -22,7 +24,7 @@ impl AsRef<str> for AppDalContentCategory {
}
#[derive(Debug, Clone)]
pub struct AppDalContext {
pub struct DalContext {
pub config: AppDalConf,
}
@ -31,16 +33,35 @@ pub enum DalStoredUrl {
Absolute { url: Url },
}
impl AppDalContext {
pub fn new(app_dal_conf: AppDalConf) -> Self {
Self {
config: app_dal_conf,
impl DalStoredUrl {
pub fn as_str(&self) -> &str {
match self {
Self::RelativePath { path } => path.as_str(),
Self::Absolute { url } => url.as_str(),
}
}
}
impl AsRef<str> for DalStoredUrl {
fn as_ref(&self) -> &str {
self.as_str()
}
}
impl Display for DalStoredUrl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str().to_string())
}
}
impl DalContext {
pub fn new(dal_conf: AppDalConf) -> Self {
Self { config: dal_conf }
}
pub async fn store_blob(
&self,
content_category: AppDalContentCategory,
content_category: DalContentType,
extname: &str,
data: Bytes,
subscriber_pid: &str,

View File

@ -0,0 +1,40 @@
use std::sync::Arc;
use eyre::Context;
use loco_rs::app::AppContext;
use tokio::sync::OnceCell;
use crate::{
config::{deserialize_key_path_from_loco_rs_config, AppDalConf},
storage::DalContext,
};
static APP_DAL_CONTEXT: OnceCell<Arc<DalContext>> = OnceCell::const_new();
#[async_trait::async_trait]
pub trait AppContextDalExt {
async fn get_dal(&self) -> eyre::Result<Arc<DalContext>>;
async fn get_dal_unwrap(&self) -> Arc<DalContext>;
async fn init_dal(&self) -> eyre::Result<Arc<DalContext>> {
self.get_dal().await.wrap_err("dal context failed to init")
}
}
#[async_trait::async_trait]
impl AppContextDalExt for AppContext {
async fn get_dal(&self) -> eyre::Result<Arc<DalContext>> {
let context = APP_DAL_CONTEXT
.get_or_try_init(|| async {
deserialize_key_path_from_loco_rs_config::<AppDalConf>(&["dal"], &self.config)
.map(|dal_conf| Arc::new(DalContext::new(dal_conf)))
})
.await?;
Ok(context.clone())
}
async fn get_dal_unwrap(&self) -> Arc<DalContext> {
self.get_dal()
.await
.unwrap_or_else(|e| panic!("dal context failed to init: {}", e))
}
}

View File

@ -0,0 +1,26 @@
use axum::Router as AxumRouter;
use loco_rs::app::{AppContext, Initializer};
use crate::storage::AppContextDalExt;
pub struct AppDalInitializer;
#[async_trait::async_trait]
impl Initializer for AppDalInitializer {
fn name(&self) -> String {
"AppDalInitializer".to_string()
}
async fn before_run(&self, ctx: &AppContext) -> loco_rs::Result<()> {
ctx.init_dal().await?;
Ok(())
}
async fn after_routes(
&self,
router: AxumRouter,
_ctx: &AppContext,
) -> loco_rs::Result<AxumRouter> {
Ok(router)
}
}

View File

@ -0,0 +1,7 @@
pub mod dal;
pub mod dal_ext;
pub mod dal_initializer;
pub use dal::{DalContentType, DalContext, DalStoredUrl};
pub use dal_ext::AppContextDalExt;
pub use dal_initializer::AppDalInitializer;

View File

@ -2,7 +2,7 @@ use loco_rs::prelude::*;
use serde::{Deserialize, Serialize};
use tracing::info;
use crate::models::{bangumi, subscribers};
use crate::models::bangumi;
pub struct CollectHistoryEpisodesWorker {
pub ctx: AppContext,
@ -14,11 +14,12 @@ pub enum CollectHistoryEpisodesWorkerArgs {
}
impl CollectHistoryEpisodesWorker {
pub async fn collect_history_episodes(bangumi: &bangumi::Model, only_season: bool) {
pub async fn collect_history_episodes(bangumi: &bangumi::Model, _only_season: bool) {
info!(
"Start collecting {} season {}...",
bangumi.official_title, bangumi.season
);
todo!()
}
}
@ -30,7 +31,7 @@ impl worker::AppWorker<CollectHistoryEpisodesWorkerArgs> for CollectHistoryEpiso
#[async_trait]
impl worker::Worker<CollectHistoryEpisodesWorkerArgs> for CollectHistoryEpisodesWorker {
async fn perform(&self, args: CollectHistoryEpisodesWorkerArgs) -> worker::Result<()> {
async fn perform(&self, _args: CollectHistoryEpisodesWorkerArgs) -> worker::Result<()> {
println!("================================================");
let db = &self.ctx.db;

View File

@ -20,11 +20,11 @@ impl worker::AppWorker<SubscriptionWorkerArgs> for SubscriptionWorker {
#[async_trait]
impl worker::Worker<SubscriptionWorkerArgs> for SubscriptionWorker {
async fn perform(&self, args: SubscriptionWorkerArgs) -> worker::Result<()> {
async fn perform(&self, _args: SubscriptionWorkerArgs) -> worker::Result<()> {
println!("================================================");
let db = &self.ctx.db;
let storage = &self.ctx.storage;
// let db = &self.ctx.db;
// let storage = &self.ctx.storage;
println!("================================================");
Ok(())