feat: support mikan rss links

This commit is contained in:
master 2024-12-28 07:18:54 +08:00
parent 6149710fe0
commit e93a8a0dec
83 changed files with 5099 additions and 2561 deletions

3
.gitignore vendored
View File

@ -221,4 +221,5 @@ index.d.ts.map
/*.session.sql
/temp
/rustc-ice-*
/rustc-ice-*
/data

88
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,88 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "lldb",
"request": "launch",
"name": "debug quirks_path lib",
"cargo": {
"args": [
"test",
"--no-run",
"--lib",
"--package=quirks_path"
],
"filter": {
"name": "quirks_path",
"kind": "lib"
}
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "debug recorder bin",
"cargo": {
"args": [
"build",
"--bin=recorder_cli",
"--package=recorder",
],
"filter": {
"name": "recorder_cli",
"kind": "bin"
}
},
"args": [
"--environment",
"recorder.development"
],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "debug playground example",
"cargo": {
"args": [
"build",
"--example=playground",
"--package=recorder",
],
"filter": {
"name": "playground",
"kind": "example"
}
},
"args": [
"--environment",
"recorder.development"
],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "debug record lib",
"cargo": {
"args": [
"test",
"--no-run",
"--test=mod",
"--package=recorder"
],
"filter": {
"name": "mod",
"kind": "test"
}
},
"args": [],
"cwd": "${workspaceFolder}"
}
]
}

4216
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,3 +1,3 @@
[workspace]
members = ["crates/quirks_path", "crates/recorder"]
members = ["crates/quirks-path", "crates/recorder", "crates/torrent"]
resolver = "2"

View File

@ -88,7 +88,7 @@ database:
# Database connection URI
uri: '{{ get_env(name="DATABASE_URL", default="postgres://konobangu:konobangu@127.0.0.1:5432/konobangu") }}'
# When enabled, the sql query will be logged.
enable_logging: false
enable_logging: true
# Set the timeout duration when acquiring a connection.
connect_timeout: 500
# Set the idle duration before closing a connection.
@ -111,3 +111,6 @@ redis:
# Dangerously flush all data in Redis on startup. dangerous operation, make sure that you using this flag only on dev environments or test mode
dangerously_flush: false
settings:
dal:
fs_root: ./temp

View File

@ -1,4 +1,3 @@
#![feature(strict_provenance)]
#![feature(extend_one)]
mod url;
@ -605,10 +604,9 @@ impl<'a> PartialEq for Components<'a> {
&& self.back == State::Body
&& other.back == State::Body
&& self.prefix_verbatim() == other.prefix_verbatim()
&& self.path == other.path
{
if self.path == other.path {
return true;
}
return true;
}
Iterator::eq(self.clone().rev(), other.clone().rev())
@ -714,10 +712,11 @@ impl PathBuf {
fn _push(&mut self, path: &Path) {
let main_sep_str = self.get_main_sep();
let mut need_sep = self
.as_mut_vec()
.last()
.map_or(false, |c| !is_separator(*c as char));
.is_some_and(|c| !is_separator(*c as char));
let comps = self.components();
@ -772,11 +771,11 @@ impl PathBuf {
self.inner.push(main_sep_str)
}
self.inner.extend(path)
self.inner.push_str(path.as_str())
}
pub fn pop(&mut self) -> bool {
match self.parent().map(|p| p.inner.as_bytes().len()) {
match self.parent().map(|p| p.inner.len()) {
Some(len) => {
self.as_mut_vec().truncate(len);
true
@ -1121,7 +1120,7 @@ impl Eq for PathBuf {}
impl PartialOrd for PathBuf {
#[inline]
fn partial_cmp(&self, other: &PathBuf) -> Option<cmp::Ordering> {
Some(compare_components(self.components(), other.components()))
Some(self.cmp(other))
}
}
@ -1199,7 +1198,7 @@ impl Path {
}
pub fn ancestors(&self) -> Ancestors<'_> {
Ancestors { next: Some(&self) }
Ancestors { next: Some(self) }
}
pub fn file_name(&self) -> Option<&str> {
@ -1247,7 +1246,7 @@ impl Path {
pub fn file_prefix(&self) -> Option<&str> {
self.file_name()
.map(split_file_at_dot)
.and_then(|(before, _after)| Some(before))
.map(|(before, _after)| before)
}
pub fn extension(&self) -> Option<&str> {

View File

@ -13,57 +13,57 @@ name = "recorder_cli"
path = "src/bin/main.rs"
required-features = []
[features]
default = []
testcontainers = []
[dependencies]
loco-rs = { version = "0.3.1" }
loco-rs = { version = "0.13.2" }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
eyre = "0.6"
tokio = { version = "1.33.0", default-features = false }
async-trait = "0.1.74"
tracing = "0.1.40"
tokio = { version = "1.42", default-features = false }
async-trait = "0.1.83"
tracing = "0.1.41"
chrono = "0.4"
validator = { version = "0.16" }
sea-orm = { version = "1.0.0-rc.1", features = [
validator = { version = "0.19" }
sea-orm = { version = "1.1.3", features = [
"sqlx-sqlite",
"sqlx-postgres",
"runtime-tokio-rustls",
"macros",
] }
axum = "0.7.1"
axum = "0.7.9"
include_dir = "0.7"
uuid = { version = "1.6.0", features = ["v4"] }
tracing-subscriber = { version = "0.3.17", features = ["env-filter", "json"] }
sea-orm-migration = { version = "1.0.0-rc.1", features = [
"runtime-tokio-rustls",
] }
reqwest = "0.11.24"
thiserror = "1.0.57"
rss = "2.0.7"
bytes = "1.5.0"
futures = "0.3.30"
itertools = "0.12.1"
qbit-rs = { git = "https://github.com/George-Miao/qbit.git", rev = "ad5af6a", features = ["default", "builder"] }
url = "2.5.0"
fancy-regex = "0.13.0"
regex = "1.10.3"
lazy_static = "1.4.0"
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "json"] }
sea-orm-migration = { version = "1.1.3", features = ["runtime-tokio-rustls"] }
reqwest = "0.12.9"
thiserror = "2"
rss = "2"
bytes = "1.9"
futures = "0.3.31"
itertools = "0.13.0"
url = "2.5"
fancy-regex = "0.14"
regex = "1.11"
lazy_static = "1.5"
maplit = "1.0.2"
tl = { version = "0.7.8", features = ["simd"] }
lightningcss = "1.0.0-alpha.54"
lightningcss = "1.0.0-alpha.61"
html-escape = "0.2.13"
opendal = "0.45.0"
librqbit-core = "3.5.0"
quirks_path = { path = "../quirks_path" }
opendal = { version = "0.51.0", features = ["default", "services-fs"] }
quirks_path = { path = "../quirks-path" }
torrent = { path = "../torrent" }
zune-image = "0.4.15"
once_cell = "1.20.2"
reqwest-middleware = "0.4.0"
reqwest-retry = "0.7.0"
reqwest-tracing = "0.5.5"
scraper = "0.22.0"
leaky-bucket = "1.1.2"
serde_with = "3"
[dev-dependencies]
serial_test = "2.0.0"
rstest = "0.18.2"
loco-rs = { version = "0.3.1", features = ["testing"] }
insta = { version = "1.34.0", features = ["redactions", "yaml", "filters"] }
testcontainers = { version = "0.15.0" }
testcontainers-modules = { version = "0.3.5" }
serial_test = "3"
rstest = "0.23.0"
loco-rs = { version = "0.13.2", features = ["testing"] }
insta = { version = "1", features = ["redactions", "yaml", "filters"] }
testcontainers = { version = "0.23.1" }
testcontainers-modules = { version = "0.11.4" }

View File

@ -1,20 +1,62 @@
#![allow(unused_imports)]
use eyre::Context;
use loco_rs::{cli::playground, prelude::*};
use itertools::Itertools;
use loco_rs::{
app::Hooks,
boot::{BootResult, StartMode},
environment::Environment,
prelude::*,
};
use recorder::{
app::App,
extract::mikan::parse_mikan_rss_items_from_rss_link,
migrations::Migrator,
models::{
subscribers::ROOT_SUBSCRIBER,
subscriptions::{self, SubscriptionCreateFromRssDto},
},
};
use sea_orm_migration::MigratorTrait;
async fn fetch_and_parse_rss_demo() -> eyre::Result<()> {
let url =
"https://mikanani.me/RSS/MyBangumi?token=FE9tccsML2nBPUUqpCuJW2uJZydAXCntHJ7RpD9LDP8%3d";
async fn pull_mikan_bangumi_rss(ctx: &AppContext) -> eyre::Result<()> {
let rss_link = "https://mikanani.me/RSS/Bangumi?bangumiId=3416&subgroupid=370";
let subscription = if let Some(subscription) = subscriptions::Entity::find()
.filter(subscriptions::Column::SourceUrl.eq(String::from(rss_link)))
.one(&ctx.db)
.await?
{
subscription
} else {
subscriptions::Model::add_subscription(
ctx,
subscriptions::SubscriptionCreateDto::Mikan(SubscriptionCreateFromRssDto {
rss_link: rss_link.to_string(),
display_name: String::from("Mikan Project - 我的番组"),
enabled: Some(true),
}),
1,
)
.await?
};
subscription.pull_subscription(ctx).await?;
let res = reqwest::get(url).await?.bytes().await?;
let channel = rss::Channel::read_from(&res[..])?;
println!("channel: {:#?}", channel);
Ok(())
}
async fn init() -> eyre::Result<AppContext> {
let ctx = loco_rs::cli::playground::<App>().await?;
let BootResult {
app_context: ctx, ..
} = loco_rs::boot::run_app::<App>(&StartMode::ServerOnly, ctx).await?;
Migrator::up(&ctx.db, None).await?;
Ok(ctx)
}
#[tokio::main]
async fn main() -> eyre::Result<()> {
fetch_and_parse_rss_demo().await?;
let ctx = init().await?;
pull_mikan_bangumi_rss(&ctx).await?;
// let active_model: articles::ActiveModel = ActiveModel {
// title: Set(Some("how to build apps in 3 steps".to_string())),
@ -25,7 +67,6 @@ async fn main() -> eyre::Result<()> {
// let res = articles::Entity::find().all(&ctx.db).await.unwrap();
// println!("{:?}", res);
println!("welcome to playground. edit me at `examples/playground.rs`");
Ok(())
}

View File

@ -4,20 +4,37 @@ use async_trait::async_trait;
use loco_rs::{
app::{AppContext, Hooks},
boot::{create_app, BootResult, StartMode},
cache,
controller::AppRoutes,
db::truncate_table,
environment::Environment,
prelude::*,
task::Tasks,
worker::{AppWorker, Processor},
Result,
};
use sea_orm::DatabaseConnection;
use crate::{
controllers, migrations::Migrator, models::entities::subscribers,
controllers,
dal::{AppDalClient, AppDalInitalizer},
extract::mikan::{client::AppMikanClientInitializer, AppMikanClient},
migrations::Migrator,
models::entities::subscribers,
workers::subscription_worker::SubscriptionWorker,
};
pub trait AppContextExt {
fn get_dal_client(&self) -> &AppDalClient {
AppDalClient::global()
}
fn get_mikan_client(&self) -> &AppMikanClient {
AppMikanClient::global()
}
}
impl AppContextExt for AppContext {}
pub struct App;
#[async_trait]
@ -26,6 +43,15 @@ impl Hooks for App {
env!("CARGO_CRATE_NAME")
}
async fn initializers(_ctx: &AppContext) -> Result<Vec<Box<dyn Initializer>>> {
let initializers: Vec<Box<dyn Initializer>> = vec![
Box::new(AppDalInitalizer),
Box::new(AppMikanClientInitializer),
];
Ok(initializers)
}
fn app_version() -> String {
format!(
"{} ({})",
@ -46,8 +72,16 @@ impl Hooks for App {
.add_route(controllers::subscribers::routes())
}
fn connect_workers<'a>(p: &'a mut Processor, ctx: &'a AppContext) {
p.register(SubscriptionWorker::build(ctx));
async fn connect_workers(ctx: &AppContext, queue: &Queue) -> Result<()> {
queue.register(SubscriptionWorker::build(ctx)).await?;
Ok(())
}
async fn after_context(ctx: AppContext) -> Result<AppContext> {
Ok(AppContext {
cache: cache::Cache::new(cache::drivers::inmem::new()).into(),
..ctx
})
}
fn register_tasks(_tasks: &mut Tasks) {}

View File

@ -1,8 +1,8 @@
use loco_rs::cli;
use recorder::migrations::Migrator;
use recorder::app::App;
use recorder::{app::App, migrations::Migrator};
#[tokio::main]
async fn main() -> eyre::Result<()> {
cli::main::<App, Migrator>().await
cli::main::<App, Migrator>().await?;
Ok(())
}

View File

@ -1,10 +0,0 @@
use serde::{Deserialize, Serialize};
pub fn default_app_dal_fs_root() -> String {
String::from("data")
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AppDalConf {
pub fs_root: String,
}

View File

@ -1,44 +1,53 @@
pub mod dal_conf;
pub use dal_conf::AppDalConf;
use eyre::OptionExt;
use itertools::Itertools;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde::de::DeserializeOwned;
pub const DAL_CONF_KEY: &str = "dal";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AppCustomConf {
pub dal: AppDalConf,
}
use crate::{
dal::{config::AppDalConfig, DAL_CONF_KEY},
extract::mikan::{AppMikanConfig, MIKAN_CONF_KEY},
};
pub fn deserialize_key_path_from_json_value<T: DeserializeOwned>(
key_path: &[&str],
value: &serde_json::Value,
) -> eyre::Result<T> {
key_path: &[&str],
) -> Result<Option<T>, loco_rs::Error> {
let mut stack = vec![("", value)];
for key in key_path {
let current = stack.last().unwrap().1;
if let Some(v) = current.get(key) {
stack.push((key, v));
} else {
let failed_key_path = stack.iter().map(|s| s.0).collect_vec().join(".");
return Err(eyre::eyre!(
"can not config key {} of settings",
failed_key_path
));
return Ok(None);
}
}
let result: T = serde_json::from_value(stack.pop().unwrap().1.clone())?;
Ok(result)
Ok(Some(result))
}
pub fn deserialize_key_path_from_loco_rs_config<T: DeserializeOwned>(
key_path: &[&str],
pub fn deserialize_key_path_from_app_config<T: DeserializeOwned>(
app_config: &loco_rs::config::Config,
) -> eyre::Result<T> {
let settings = app_config
.settings
.as_ref()
.ok_or_eyre("App config setting not set")?;
deserialize_key_path_from_json_value(key_path, settings)
key_path: &[&str],
) -> Result<Option<T>, loco_rs::Error> {
let settings = app_config.settings.as_ref();
if let Some(settings) = settings {
deserialize_key_path_from_json_value(settings, key_path)
} else {
Ok(None)
}
}
pub trait AppConfigExt {
fn get_root_conf(&self) -> &loco_rs::config::Config;
fn get_dal_conf(&self) -> loco_rs::Result<Option<AppDalConfig>> {
deserialize_key_path_from_app_config(self.get_root_conf(), &[DAL_CONF_KEY])
}
fn get_mikan_conf(&self) -> loco_rs::Result<Option<AppMikanConfig>> {
deserialize_key_path_from_app_config(self.get_root_conf(), &[MIKAN_CONF_KEY])
}
}
impl AppConfigExt for loco_rs::config::Config {
fn get_root_conf(&self) -> &loco_rs::config::Config {
self
}
}

View File

@ -2,8 +2,8 @@ use loco_rs::prelude::*;
use crate::{models::entities::subscribers, views::subscribers::CurrentResponse};
async fn current(State(ctx): State<AppContext>) -> Result<Json<CurrentResponse>> {
let subscriber = subscribers::Model::find_root(&ctx.db).await?;
async fn current(State(ctx): State<AppContext>) -> Result<impl IntoResponse> {
let subscriber = subscribers::Model::find_root(&ctx).await?;
format::json(CurrentResponse::new(&subscriber))
}

View File

@ -0,0 +1,201 @@
use std::fmt;
use bytes::Bytes;
use loco_rs::app::{AppContext, Initializer};
use once_cell::sync::OnceCell;
use opendal::{layers::LoggingLayer, services::Fs, Buffer, Operator};
use quirks_path::{Path, PathBuf};
use serde::{Deserialize, Serialize};
use url::Url;
use uuid::Uuid;
use super::AppDalConfig;
use crate::config::AppConfigExt;
// TODO: wait app-context-trait to integrate
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum DalContentCategory {
Image,
}
impl AsRef<str> for DalContentCategory {
fn as_ref(&self) -> &str {
match self {
Self::Image => "image",
}
}
}
#[derive(Debug, Clone)]
pub struct AppDalClient {
pub config: AppDalConfig,
}
static APP_DAL_CLIENT: OnceCell<AppDalClient> = OnceCell::new();
pub enum DalStoredUrl {
RelativePath { path: String },
Absolute { url: Url },
}
impl AsRef<str> for DalStoredUrl {
fn as_ref(&self) -> &str {
match &self {
Self::Absolute { url } => url.as_str(),
Self::RelativePath { path } => path,
}
}
}
impl fmt::Display for DalStoredUrl {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_ref())
}
}
impl AppDalClient {
pub fn new(config: AppDalConfig) -> Self {
Self { config }
}
pub fn global() -> &'static AppDalClient {
APP_DAL_CLIENT
.get()
.expect("Global app dal client is not initialized")
}
pub fn get_fs(&self) -> Fs {
Fs::default().root(
self.config
.data_dir
.as_ref()
.map(|s| s as &str)
.unwrap_or("data"),
)
}
pub fn create_filename(extname: &str) -> String {
format!("{}{}", Uuid::new_v4(), extname)
}
pub async fn store_object(
&self,
content_category: DalContentCategory,
subscriber_pid: &str,
bucket: Option<&str>,
filename: &str,
data: Bytes,
) -> eyre::Result<DalStoredUrl> {
match content_category {
DalContentCategory::Image => {
let fullname = [
subscriber_pid,
content_category.as_ref(),
bucket.unwrap_or_default(),
filename,
]
.into_iter()
.map(Path::new)
.collect::<PathBuf>();
let fs_op = Operator::new(self.get_fs())?
.layer(LoggingLayer::default())
.finish();
if let Some(dirname) = fullname.parent() {
let dirname = dirname.join("/");
fs_op.create_dir(dirname.as_str()).await?;
}
fs_op.write(fullname.as_str(), data).await?;
Ok(DalStoredUrl::RelativePath {
path: fullname.to_string(),
})
}
}
}
pub async fn exists_object(
&self,
content_category: DalContentCategory,
subscriber_pid: &str,
bucket: Option<&str>,
filename: &str,
) -> eyre::Result<Option<DalStoredUrl>> {
match content_category {
DalContentCategory::Image => {
let fullname = [
subscriber_pid,
content_category.as_ref(),
bucket.unwrap_or_default(),
filename,
]
.into_iter()
.map(Path::new)
.collect::<PathBuf>();
let fs_op = Operator::new(self.get_fs())?
.layer(LoggingLayer::default())
.finish();
if fs_op.exists(fullname.as_str()).await? {
Ok(Some(DalStoredUrl::RelativePath {
path: fullname.to_string(),
}))
} else {
Ok(None)
}
}
}
}
pub async fn load_object(
&self,
content_category: DalContentCategory,
subscriber_pid: &str,
bucket: Option<&str>,
filename: &str,
) -> eyre::Result<Buffer> {
match content_category {
DalContentCategory::Image => {
let fullname = [
subscriber_pid,
content_category.as_ref(),
bucket.unwrap_or_default(),
filename,
]
.into_iter()
.map(Path::new)
.collect::<PathBuf>();
let fs_op = Operator::new(self.get_fs())?
.layer(LoggingLayer::default())
.finish();
let data = fs_op.read(fullname.as_str()).await?;
Ok(data)
}
}
}
}
pub struct AppDalInitalizer;
#[async_trait::async_trait]
impl Initializer for AppDalInitalizer {
fn name(&self) -> String {
String::from("AppDalInitalizer")
}
async fn before_run(&self, app_context: &AppContext) -> loco_rs::Result<()> {
let config = &app_context.config;
let app_dal_conf = config.get_dal_conf()?;
APP_DAL_CLIENT.get_or_init(|| AppDalClient::new(app_dal_conf.unwrap_or_default()));
Ok(())
}
}

View File

@ -0,0 +1,8 @@
use serde::{Deserialize, Serialize};
pub const DAL_CONF_KEY: &str = "dal";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct AppDalConfig {
pub data_dir: Option<String>,
}

View File

@ -1,74 +1,4 @@
use bytes::Bytes;
use opendal::{layers::LoggingLayer, services, Operator};
use quirks_path::{Path, PathBuf};
use serde::{Deserialize, Serialize};
use url::Url;
use uuid::Uuid;
use crate::config::AppDalConf;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AppDalContentCategory {
Poster,
}
impl AsRef<str> for AppDalContentCategory {
fn as_ref(&self) -> &str {
match self {
Self::Poster => "poster",
}
}
}
#[derive(Debug, Clone)]
pub struct AppDalContext {
pub config: AppDalConf,
}
pub enum DalStoredUrl {
RelativePath { path: String },
Absolute { url: Url },
}
impl AppDalContext {
pub fn new(app_dal_conf: AppDalConf) -> Self {
Self {
config: app_dal_conf,
}
}
pub async fn store_blob(
&self,
content_category: AppDalContentCategory,
extname: &str,
data: Bytes,
subscriber_pid: &str,
) -> eyre::Result<DalStoredUrl> {
let basename = format!("{}{}", Uuid::new_v4(), extname);
let mut dirname = [subscriber_pid, content_category.as_ref()]
.into_iter()
.map(Path::new)
.collect::<PathBuf>();
let mut fs_builder = services::Fs::default();
fs_builder.root(self.config.fs_root.as_str());
let fs_op = Operator::new(fs_builder)?
.layer(LoggingLayer::default())
.finish();
fs_op.create_dir(dirname.as_str()).await?;
let fullname = {
dirname.push(basename);
dirname
};
fs_op.write_with(fullname.as_str(), data).await?;
Ok(DalStoredUrl::RelativePath {
path: fullname.to_string(),
})
}
}
pub mod client;
pub mod config;
pub use client::{AppDalClient, AppDalInitalizer, DalContentCategory};
pub use config::{AppDalConfig, DAL_CONF_KEY};

View File

@ -1,12 +0,0 @@
use bytes::Bytes;
use reqwest::IntoUrl;
use super::defs::DEFAULT_USER_AGENT;
pub async fn download_bytes<T: IntoUrl>(url: T) -> eyre::Result<Bytes> {
let request_client = reqwest::Client::builder()
.user_agent(DEFAULT_USER_AGENT)
.build()?;
let bytes = request_client.get(url).send().await?.bytes().await?;
Ok(bytes)
}

View File

@ -1,11 +0,0 @@
use reqwest::IntoUrl;
use super::defs::DEFAULT_USER_AGENT;
pub async fn download_html<U: IntoUrl>(url: U) -> eyre::Result<String> {
let request_client = reqwest::Client::builder()
.user_agent(DEFAULT_USER_AGENT)
.build()?;
let content = request_client.get(url).send().await?.text().await?;
Ok(content)
}

View File

@ -1,8 +0,0 @@
use bytes::Bytes;
use reqwest::IntoUrl;
use super::bytes::download_bytes;
pub async fn download_image<U: IntoUrl>(url: U) -> eyre::Result<Bytes> {
download_bytes(url).await
}

View File

@ -1,7 +0,0 @@
pub mod bytes;
pub mod defs;
pub mod error;
pub mod html;
pub mod qbitorrent;
pub mod torrent_downloader;
pub mod image;

View File

@ -1,96 +0,0 @@
use downloaders::DownloaderCategory;
use quirks_path::{Path, PathBuf};
use sea_orm::{ActiveModelTrait, ActiveValue, DatabaseConnection, IntoActiveModel};
use url::Url;
use super::{
defs::{Torrent, TorrentFilter, TorrentSource},
qbitorrent::QBittorrentDownloader,
};
use crate::{
models::{bangumi, downloaders, downloads},
path::torrent_path::gen_bangumi_sub_path,
};
#[async_trait::async_trait]
pub trait TorrentDownloader {
async fn get_torrents_info(
&self,
status_filter: TorrentFilter,
category: Option<String>,
tag: Option<String>,
) -> eyre::Result<Vec<Torrent>>;
async fn add_torrents(
&self,
source: TorrentSource,
save_path: String,
category: Option<&str>,
) -> eyre::Result<()>;
async fn delete_torrents(&self, hashes: Vec<String>) -> eyre::Result<()>;
async fn rename_torrent_file(
&self,
hash: &str,
old_path: &str,
new_path: &str,
) -> eyre::Result<()>;
async fn move_torrents(&self, hashes: Vec<String>, new_path: &str) -> eyre::Result<()>;
async fn get_torrent_path(&self, hashes: String) -> eyre::Result<Option<String>>;
async fn check_connection(&self) -> eyre::Result<()>;
async fn set_torrents_category(&self, hashes: Vec<String>, category: &str) -> eyre::Result<()>;
async fn add_torrent_tags(&self, hashes: Vec<String>, tags: Vec<String>) -> eyre::Result<()>;
async fn add_category(&self, category: &str) -> eyre::Result<()>;
fn get_save_path(&self, sub_path: &Path) -> PathBuf;
async fn add_downloads_for_bangumi<'a, 'b>(
&self,
db: &'a DatabaseConnection,
downloads: &[&downloads::Model],
mut bangumi: bangumi::Model,
) -> eyre::Result<bangumi::Model> {
if bangumi.save_path.is_none() {
let gen_sub_path = gen_bangumi_sub_path(&bangumi);
let mut bangumi_active = bangumi.into_active_model();
bangumi_active.save_path = ActiveValue::Set(Some(gen_sub_path.to_string()));
bangumi = bangumi_active.update(db).await?;
}
let sub_path = bangumi
.save_path
.as_ref()
.unwrap_or_else(|| unreachable!("must have a sub path"));
let mut torrent_urls = vec![];
for m in downloads.iter() {
torrent_urls.push(Url::parse(&m.url as &str)?);
}
// make sequence to prevent too fast to be banned
for d in downloads.iter() {
let source = TorrentSource::parse(&d.url).await?;
self.add_torrents(source, sub_path.clone(), Some("bangumi"))
.await?;
}
Ok(bangumi)
}
}
pub async fn build_torrent_downloader_from_downloader_model(
model: downloaders::Model,
) -> eyre::Result<Box<dyn TorrentDownloader>> {
Ok(Box::new(match &model.category {
DownloaderCategory::QBittorrent => {
QBittorrentDownloader::from_downloader_model(model).await?
}
}))
}

View File

@ -12,4 +12,8 @@ pub enum ParseError {
expected: String,
found: String,
},
#[error("Parse mikan rss {url} format error")]
MikanRssFormatError { url: String },
#[error("Parse mikan rss item format error, {reason}")]
MikanRssItemFormatError { reason: String },
}

View File

@ -0,0 +1,3 @@
pub mod styles;
pub use styles::parse_style_attr;

View File

@ -0,0 +1,6 @@
use lightningcss::declaration::DeclarationBlock;
pub fn parse_style_attr(style_attr: &str) -> Option<DeclarationBlock> {
let result = DeclarationBlock::parse_string(style_attr, Default::default()).ok()?;
Some(result)
}

View File

@ -0,0 +1,64 @@
use std::ops::Deref;
use loco_rs::app::{AppContext, Initializer};
use once_cell::sync::OnceCell;
use super::{AppMikanConfig, MIKAN_BASE_URL};
use crate::{config::AppConfigExt, fetch::HttpClient};
static APP_MIKAN_CLIENT: OnceCell<AppMikanClient> = OnceCell::new();
pub struct AppMikanClient {
http_client: HttpClient,
base_url: String,
}
impl AppMikanClient {
pub fn new(mut config: AppMikanConfig) -> loco_rs::Result<Self> {
let http_client =
HttpClient::new(config.http_client.take()).map_err(loco_rs::Error::wrap)?;
let base_url = config
.base_url
.unwrap_or_else(|| String::from(MIKAN_BASE_URL));
Ok(Self {
http_client,
base_url,
})
}
pub fn global() -> &'static AppMikanClient {
APP_MIKAN_CLIENT
.get()
.expect("Global mikan http client is not initialized")
}
pub fn base_url(&self) -> &str {
&self.base_url
}
}
impl Deref for AppMikanClient {
type Target = HttpClient;
fn deref(&self) -> &Self::Target {
&self.http_client
}
}
pub struct AppMikanClientInitializer;
#[async_trait::async_trait]
impl Initializer for AppMikanClientInitializer {
fn name(&self) -> String {
"AppMikanClientInitializer".to_string()
}
async fn before_run(&self, app_context: &AppContext) -> loco_rs::Result<()> {
let config = &app_context.config;
let app_mikan_conf = config.get_mikan_conf()?.unwrap_or_default();
APP_MIKAN_CLIENT.get_or_try_init(|| AppMikanClient::new(app_mikan_conf))?;
Ok(())
}
}

View File

@ -0,0 +1,11 @@
use serde::{Deserialize, Serialize};
use crate::fetch::HttpClientConfig;
pub const MIKAN_CONF_KEY: &str = "mikan";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct AppMikanConfig {
pub http_client: Option<HttpClientConfig>,
pub base_url: Option<String>,
}

View File

@ -0,0 +1,4 @@
pub const MIKAN_BUCKET_KEY: &str = "mikan";
pub const MIKAN_BASE_URL: &str = "https://mikanani.me";
pub const MIKAN_UNKNOWN_FANSUB_NAME: &str = "生肉/不明字幕";
pub const MIKAN_UNKNOWN_FANSUB_ID: &str = "202";

View File

@ -0,0 +1,22 @@
pub mod client;
pub mod config;
pub mod constants;
pub mod rss_parser;
pub mod web_parser;
pub use client::{AppMikanClient, AppMikanClientInitializer};
pub use config::{AppMikanConfig, MIKAN_CONF_KEY};
pub use constants::{MIKAN_BASE_URL, MIKAN_BUCKET_KEY};
pub use rss_parser::{
build_mikan_bangumi_rss_link, build_mikan_subscriber_aggregation_rss_link,
parse_mikan_bangumi_id_from_rss_link, parse_mikan_rss_channel_from_rss_link,
parse_mikan_rss_items_from_rss_link, parse_mikan_subscriber_aggregation_id_from_rss_link,
MikanBangumiAggregationRssChannel, MikanBangumiRssChannel, MikanBangumiRssLink,
MikanRssChannel, MikanRssItem, MikanSubscriberAggregationRssChannel,
MikanSubscriberAggregationRssLink,
};
pub use web_parser::{
build_mikan_bangumi_homepage, build_mikan_episode_homepage,
parse_mikan_bangumi_meta_from_mikan_homepage, parse_mikan_episode_meta_from_mikan_homepage,
MikanBangumiMeta, MikanEpisodeMeta,
};

View File

@ -0,0 +1,353 @@
use std::ops::Deref;
use chrono::DateTime;
use itertools::Itertools;
use reqwest::IntoUrl;
use serde::{Deserialize, Serialize};
use torrent::core::BITTORRENT_MIME_TYPE;
use url::Url;
use super::{
web_parser::{parse_mikan_episode_id_from_homepage, MikanEpisodeHomepage},
AppMikanClient,
};
use crate::{extract::errors::ParseError, fetch::bytes::download_bytes_with_client};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MikanRssItem {
pub title: String,
pub homepage: Url,
pub url: Url,
pub content_length: Option<u64>,
pub mime: String,
pub pub_date: Option<i64>,
pub mikan_episode_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MikanBangumiRssChannel {
pub name: String,
pub url: Url,
pub mikan_bangumi_id: String,
pub mikan_fansub_id: String,
pub items: Vec<MikanRssItem>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MikanBangumiAggregationRssChannel {
pub name: String,
pub url: Url,
pub mikan_bangumi_id: String,
pub items: Vec<MikanRssItem>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MikanSubscriberAggregationRssChannel {
pub mikan_aggregation_id: String,
pub url: Url,
pub items: Vec<MikanRssItem>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum MikanRssChannel {
Bangumi(MikanBangumiRssChannel),
BangumiAggregation(MikanBangumiAggregationRssChannel),
SubscriberAggregation(MikanSubscriberAggregationRssChannel),
}
impl MikanRssChannel {
pub fn items(&self) -> &[MikanRssItem] {
match &self {
Self::Bangumi(MikanBangumiRssChannel { items, .. })
| Self::BangumiAggregation(MikanBangumiAggregationRssChannel { items, .. })
| Self::SubscriberAggregation(MikanSubscriberAggregationRssChannel { items, .. }) => {
items
}
}
}
pub fn into_items(self) -> Vec<MikanRssItem> {
match self {
Self::Bangumi(MikanBangumiRssChannel { items, .. })
| Self::BangumiAggregation(MikanBangumiAggregationRssChannel { items, .. })
| Self::SubscriberAggregation(MikanSubscriberAggregationRssChannel { items, .. }) => {
items
}
}
}
pub fn name(&self) -> Option<&str> {
match &self {
Self::Bangumi(MikanBangumiRssChannel { name, .. })
| Self::BangumiAggregation(MikanBangumiAggregationRssChannel { name, .. }) => {
Some(name.as_str())
}
Self::SubscriberAggregation(MikanSubscriberAggregationRssChannel { .. }) => None,
}
}
pub fn url(&self) -> &Url {
match &self {
Self::Bangumi(MikanBangumiRssChannel { url, .. })
| Self::BangumiAggregation(MikanBangumiAggregationRssChannel { url, .. })
| Self::SubscriberAggregation(MikanSubscriberAggregationRssChannel { url, .. }) => url,
}
}
}
impl TryFrom<rss::Item> for MikanRssItem {
type Error = ParseError;
fn try_from(item: rss::Item) -> Result<Self, Self::Error> {
let mime_type = item
.enclosure()
.map(|x| x.mime_type.to_string())
.unwrap_or_default();
if mime_type == BITTORRENT_MIME_TYPE {
let enclosure = item.enclosure.unwrap();
let homepage = item
.link
.ok_or_else(|| ParseError::MikanRssItemFormatError {
reason: String::from("must to have link for homepage"),
})?;
let homepage = Url::parse(&homepage)?;
let enclosure_url = Url::parse(&enclosure.url)?;
let MikanEpisodeHomepage {
mikan_episode_id, ..
} = parse_mikan_episode_id_from_homepage(&homepage).ok_or_else(|| {
ParseError::MikanRssItemFormatError {
reason: String::from("homepage link format invalid"),
}
})?;
Ok(MikanRssItem {
title: item.title.unwrap_or_default(),
homepage,
url: enclosure_url,
content_length: enclosure.length.parse().ok(),
mime: enclosure.mime_type,
pub_date: item
.pub_date
.and_then(|s| DateTime::parse_from_rfc2822(&s).ok())
.map(|s| s.timestamp_millis()),
mikan_episode_id,
})
} else {
Err(ParseError::MimeError {
expected: String::from(BITTORRENT_MIME_TYPE),
found: mime_type,
desc: String::from("MikanRssItem"),
})
}
}
}
#[derive(Debug, Clone)]
pub struct MikanBangumiRssLink {
pub mikan_bangumi_id: String,
pub mikan_fansub_id: Option<String>,
}
#[derive(Debug, Clone)]
pub struct MikanSubscriberAggregationRssLink {
pub mikan_aggregation_id: String,
}
pub fn build_mikan_bangumi_rss_link(
mikan_base_url: &str,
mikan_bangumi_id: &str,
mikan_fansub_id: Option<&str>,
) -> eyre::Result<Url> {
let mut url = Url::parse(mikan_base_url)?;
url.set_path("/RSS/Bangumi");
url.query_pairs_mut()
.append_pair("bangumiId", mikan_bangumi_id);
if let Some(mikan_fansub_id) = mikan_fansub_id {
url.query_pairs_mut()
.append_pair("subgroupid", mikan_fansub_id);
};
Ok(url)
}
pub fn build_mikan_subscriber_aggregation_rss_link(
mikan_base_url: &str,
mikan_aggregation_id: &str,
) -> eyre::Result<Url> {
let mut url = Url::parse(mikan_base_url)?;
url.set_path("/RSS/MyBangumi");
url.query_pairs_mut()
.append_pair("token", mikan_aggregation_id);
Ok(url)
}
pub fn parse_mikan_bangumi_id_from_rss_link(url: &Url) -> Option<MikanBangumiRssLink> {
if url.path() == "/RSS/Bangumi" {
url.query_pairs()
.find(|(k, _)| k == "bangumiId")
.map(|(_, v)| MikanBangumiRssLink {
mikan_bangumi_id: v.to_string(),
mikan_fansub_id: url
.query_pairs()
.find(|(k, _)| k == "subgroupid")
.map(|(_, v)| v.to_string()),
})
} else {
None
}
}
pub fn parse_mikan_subscriber_aggregation_id_from_rss_link(
url: &Url,
) -> Option<MikanSubscriberAggregationRssLink> {
if url.path() == "/RSS/MyBangumi" {
url.query_pairs().find(|(k, _)| k == "token").map(|(_, v)| {
MikanSubscriberAggregationRssLink {
mikan_aggregation_id: v.to_string(),
}
})
} else {
None
}
}
pub async fn parse_mikan_rss_items_from_rss_link(
client: Option<&AppMikanClient>,
url: impl IntoUrl,
) -> eyre::Result<Vec<MikanRssItem>> {
let channel = parse_mikan_rss_channel_from_rss_link(client, url).await?;
Ok(channel.into_items())
}
pub async fn parse_mikan_rss_channel_from_rss_link(
client: Option<&AppMikanClient>,
url: impl IntoUrl,
) -> eyre::Result<MikanRssChannel> {
let http_client = client.map(|s| s.deref());
let bytes = download_bytes_with_client(http_client, url.as_str()).await?;
let channel = rss::Channel::read_from(&bytes[..])?;
let channel_link = Url::parse(channel.link())?;
if let Some(MikanBangumiRssLink {
mikan_bangumi_id,
mikan_fansub_id,
}) = parse_mikan_bangumi_id_from_rss_link(&channel_link)
{
let channel_name = channel.title().replace("Mikan Project - ", "");
let items = channel
.items
.into_iter()
// @TODO log error
.flat_map(MikanRssItem::try_from)
.collect_vec();
if let Some(mikan_fansub_id) = mikan_fansub_id {
Ok(MikanRssChannel::Bangumi(MikanBangumiRssChannel {
name: channel_name,
mikan_bangumi_id,
mikan_fansub_id,
url: channel_link,
items,
}))
} else {
Ok(MikanRssChannel::BangumiAggregation(
MikanBangumiAggregationRssChannel {
name: channel_name,
mikan_bangumi_id,
url: channel_link,
items,
},
))
}
} else if let Some(MikanSubscriberAggregationRssLink {
mikan_aggregation_id,
..
}) = parse_mikan_subscriber_aggregation_id_from_rss_link(&channel_link)
{
let items = channel
.items
.into_iter()
// @TODO log error
.flat_map(MikanRssItem::try_from)
.collect_vec();
return Ok(MikanRssChannel::SubscriberAggregation(
MikanSubscriberAggregationRssChannel {
mikan_aggregation_id,
items,
url: channel_link,
},
));
} else {
return Err(ParseError::MikanRssFormatError {
url: url.as_str().into(),
}
.into());
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use torrent::core::BITTORRENT_MIME_TYPE;
use crate::extract::mikan::{
parse_mikan_rss_channel_from_rss_link, MikanBangumiAggregationRssChannel,
MikanBangumiRssChannel, MikanRssChannel,
};
#[tokio::test]
pub async fn test_parse_mikan_rss_channel_from_rss_link() {
{
let bangumi_url = "https://mikanani.me/RSS/Bangumi?bangumiId=3141&subgroupid=370";
let channel = parse_mikan_rss_channel_from_rss_link(None, bangumi_url)
.await
.expect("should get mikan channel from rss url");
assert_matches!(
&channel,
MikanRssChannel::Bangumi(MikanBangumiRssChannel { .. })
);
assert_matches!(&channel.name(), Some("葬送的芙莉莲"));
let items = channel.items();
let first_sub_item = items
.first()
.expect("mikan subscriptions should have at least one subs");
assert_eq!(first_sub_item.mime, BITTORRENT_MIME_TYPE);
assert!(&first_sub_item
.homepage
.as_str()
.starts_with("https://mikanani.me/Home/Episode"));
let name = first_sub_item.title.as_str();
assert!(name.contains("葬送的芙莉莲"));
}
{
let bangumi_url = "https://mikanani.me/RSS/Bangumi?bangumiId=3416";
let channel = parse_mikan_rss_channel_from_rss_link(None, bangumi_url)
.await
.expect("should get mikan channel from rss url");
assert_matches!(
&channel,
MikanRssChannel::BangumiAggregation(MikanBangumiAggregationRssChannel { .. })
);
assert_matches!(&channel.name(), Some("叹气的亡灵想隐退"));
}
}
}

View File

@ -0,0 +1,493 @@
use std::ops::Deref;
use bytes::Bytes;
use eyre::ContextCompat;
use html_escape::decode_html_entities;
use itertools::Itertools;
use lazy_static::lazy_static;
use lightningcss::{properties::Property, values::image::Image as CSSImage};
use loco_rs::app::AppContext;
use regex::Regex;
use scraper::Html;
use url::Url;
use super::{
parse_mikan_bangumi_id_from_rss_link, AppMikanClient, MikanBangumiRssLink, MIKAN_BUCKET_KEY,
};
use crate::{
app::AppContextExt,
dal::DalContentCategory,
extract::html::parse_style_attr,
fetch::{html::download_html_with_client, image::download_image_with_client},
models::subscribers,
};
#[derive(Clone, Debug, PartialEq)]
pub struct MikanEpisodeMeta {
pub homepage: Url,
pub origin_poster_src: Option<Url>,
pub bangumi_title: String,
pub episode_title: String,
pub fansub: String,
pub mikan_bangumi_id: String,
pub mikan_fansub_id: String,
pub mikan_episode_id: String,
}
#[derive(Clone, Debug, PartialEq)]
pub struct MikanBangumiMeta {
pub homepage: Url,
pub origin_poster_src: Option<Url>,
pub bangumi_title: String,
pub mikan_bangumi_id: String,
pub mikan_fansub_id: Option<String>,
pub fansub: Option<String>,
pub mikan_fansub_candidates: Vec<(String, String)>,
}
#[derive(Clone, Debug, PartialEq)]
pub struct MikanBangumiPosterMeta {
pub origin_poster_src: Url,
pub poster_data: Option<Bytes>,
pub poster_src: Option<String>,
}
#[derive(Clone, Debug, PartialEq)]
pub struct MikanEpisodeHomepage {
pub mikan_episode_id: String,
}
lazy_static! {
static ref MIKAN_TITLE_SEASON: Regex = Regex::new("第.*季").unwrap();
}
pub fn build_mikan_bangumi_homepage(
mikan_base_url: &str,
mikan_bangumi_id: &str,
mikan_fansub_id: Option<&str>,
) -> eyre::Result<Url> {
let mut url = Url::parse(mikan_base_url)?;
url.set_path(&format!("/Home/Bangumi/{mikan_bangumi_id}"));
url.set_fragment(mikan_fansub_id);
Ok(url)
}
pub fn build_mikan_episode_homepage(
mikan_base_url: &str,
mikan_episode_id: &str,
) -> eyre::Result<Url> {
let mut url = Url::parse(mikan_base_url)?;
url.set_path(&format!("/Home/Episode/{mikan_episode_id}"));
Ok(url)
}
pub fn parse_mikan_episode_id_from_homepage(url: &Url) -> Option<MikanEpisodeHomepage> {
if url.path().starts_with("/Home/Episode/") {
let mikan_episode_id = url.path().replace("/Home/Episode/", "");
Some(MikanEpisodeHomepage { mikan_episode_id })
} else {
None
}
}
pub async fn parse_mikan_bangumi_poster_from_origin_poster_src(
client: Option<&AppMikanClient>,
origin_poster_src: Url,
) -> eyre::Result<MikanBangumiPosterMeta> {
let http_client = client.map(|s| s.deref());
let poster_data = download_image_with_client(http_client, origin_poster_src.clone()).await?;
Ok(MikanBangumiPosterMeta {
origin_poster_src,
poster_data: Some(poster_data),
poster_src: None,
})
}
pub async fn parse_mikan_bangumi_poster_from_origin_poster_src_with_cache(
ctx: &AppContext,
origin_poster_src: Url,
subscriber_id: i32,
) -> eyre::Result<MikanBangumiPosterMeta> {
let dal_client = ctx.get_dal_client();
let mikan_client = ctx.get_mikan_client();
let subscriber_pid = &subscribers::Model::find_pid_by_id_with_cache(ctx, subscriber_id).await?;
if let Some(poster_src) = dal_client
.exists_object(
DalContentCategory::Image,
subscriber_pid,
Some(MIKAN_BUCKET_KEY),
&origin_poster_src.path().replace("/images/Bangumi/", ""),
)
.await?
{
return Ok(MikanBangumiPosterMeta {
origin_poster_src,
poster_data: None,
poster_src: Some(poster_src.to_string()),
});
}
let poster_data =
download_image_with_client(Some(mikan_client.deref()), origin_poster_src.clone()).await?;
let poster_str = dal_client
.store_object(
DalContentCategory::Image,
subscriber_pid,
Some(MIKAN_BUCKET_KEY),
&origin_poster_src.path().replace("/images/Bangumi/", ""),
poster_data.clone(),
)
.await?;
Ok(MikanBangumiPosterMeta {
origin_poster_src,
poster_data: Some(poster_data),
poster_src: Some(poster_str.to_string()),
})
}
pub async fn parse_mikan_bangumi_meta_from_mikan_homepage(
client: Option<&AppMikanClient>,
url: Url,
) -> eyre::Result<MikanBangumiMeta> {
let http_client = client.map(|s| s.deref());
let url_host = url.origin().unicode_serialization();
let content = download_html_with_client(http_client, url.as_str()).await?;
let html = Html::parse_document(&content);
let bangumi_fansubs = html
.select(&scraper::Selector::parse(".subgroup-text").unwrap())
.filter_map(|el| {
if let (Some(fansub_id), Some(fansub_name)) = (
el.value()
.attr("id")
.map(|s| decode_html_entities(s).trim().to_string()),
el.select(&scraper::Selector::parse("a:nth-child(1)").unwrap())
.next()
.map(|child| {
let mut s = String::from(
child
.prev_sibling()
.and_then(|t| t.value().as_text())
.map(|s| s.trim())
.unwrap_or_default(),
);
s.extend(child.text());
decode_html_entities(&s).trim().to_string()
}),
) {
Some((fansub_id, fansub_name))
} else {
None
}
})
.collect_vec();
let fansub_info = url.fragment().and_then(|b| {
bangumi_fansubs
.iter()
.find_map(|(id, name)| if id == b { Some((id, name)) } else { None })
});
let bangumi_title = html
.select(&scraper::Selector::parse(".bangumi-title").unwrap())
.next()
.map(|el| {
decode_html_entities(&el.text().collect::<String>())
.trim()
.to_string()
})
.and_then(|title| if title.is_empty() { None } else { Some(title) })
.wrap_err_with(|| {
// todo: error handler
format!("Missing mikan bangumi official title for {}", url)
})?;
let MikanBangumiRssLink {
mikan_bangumi_id, ..
} = html
.select(&scraper::Selector::parse(".bangumi-title > .mikan-rss").unwrap())
.next()
.and_then(|el| el.value().attr("href"))
.as_ref()
.and_then(|s| url.join(s).ok())
.and_then(|rss_link_url| parse_mikan_bangumi_id_from_rss_link(&rss_link_url))
.wrap_err_with(|| {
// todo: error handler
format!("Missing mikan bangumi rss link or error format for {}", url)
})?;
let origin_poster_src = html
.select(&scraper::Selector::parse(".bangumi-poster").unwrap())
.next()
.and_then(|el| el.value().attr("style"))
.as_ref()
.and_then(|s| parse_style_attr(s))
.and_then(|style| {
style.iter().find_map(|(prop, _)| {
match prop {
Property::BackgroundImage(images) => {
for img in images {
if let CSSImage::Url(path) = img {
if let Ok(url) =
Url::parse(&url_host).and_then(|s| s.join(path.url.trim()))
{
return Some(url);
}
}
}
}
Property::Background(backgrounds) => {
for bg in backgrounds {
if let CSSImage::Url(path) = &bg.image {
if let Ok(url) =
Url::parse(&url_host).and_then(|s| s.join(path.url.trim()))
{
return Some(url);
}
}
}
}
_ => {}
}
None
})
})
.map(|mut origin_poster_src| {
origin_poster_src.set_query(None);
origin_poster_src
});
Ok(MikanBangumiMeta {
homepage: url,
bangumi_title,
origin_poster_src,
mikan_bangumi_id,
fansub: fansub_info.map(|s| s.1.to_string()),
mikan_fansub_id: fansub_info.map(|s| s.0.to_string()),
mikan_fansub_candidates: bangumi_fansubs.clone(),
})
}
pub async fn parse_mikan_episode_meta_from_mikan_homepage(
client: Option<&AppMikanClient>,
url: Url,
) -> eyre::Result<MikanEpisodeMeta> {
let http_client = client.map(|s| s.deref());
let url_host = url.origin().unicode_serialization();
let content = download_html_with_client(http_client, url.as_str()).await?;
let html = Html::parse_document(&content);
let bangumi_title = html
.select(&scraper::Selector::parse(".bangumi-title").unwrap())
.next()
.map(|el| {
decode_html_entities(&el.text().collect::<String>())
.trim()
.to_string()
})
.and_then(|title| if title.is_empty() { None } else { Some(title) })
.wrap_err_with(|| {
// todo: error handler
format!("Missing mikan bangumi official title for {}", url)
})?;
let episode_title = html
.select(&scraper::Selector::parse("title").unwrap())
.next()
.map(|el| {
decode_html_entities(&el.text().collect::<String>())
.replace(" - Mikan Project", "")
.trim()
.to_string()
})
.and_then(|title| if title.is_empty() { None } else { Some(title) })
.wrap_err_with(|| {
// todo: error handler
format!("Missing mikan episode official title for {}", url)
})?;
let (mikan_bangumi_id, mikan_fansub_id) = html
.select(&scraper::Selector::parse(".bangumi-title > .mikan-rss").unwrap())
.next()
.and_then(|el| el.value().attr("href"))
.as_ref()
.and_then(|s| url.join(s).ok())
.and_then(|rss_link_url| parse_mikan_bangumi_id_from_rss_link(&rss_link_url))
.and_then(
|MikanBangumiRssLink {
mikan_bangumi_id,
mikan_fansub_id,
..
}| {
mikan_fansub_id.map(|mikan_fansub_id| (mikan_bangumi_id, mikan_fansub_id))
},
)
.wrap_err_with(|| {
// todo: error handler
format!("Missing mikan bangumi rss link or error format for {}", url)
})?;
let fansub = html
.select(&scraper::Selector::parse(".bangumi-info>.magnet-link-wrap").unwrap())
.next()
.map(|el| {
decode_html_entities(&el.text().collect::<String>())
.trim()
.to_string()
})
.wrap_err_with(|| {
// todo: error handler
format!("Missing mikan bangumi fansub name for {}", url)
})?;
let origin_poster_src = html
.select(&scraper::Selector::parse(".bangumi-poster").unwrap())
.next()
.and_then(|el| el.value().attr("style"))
.as_ref()
.and_then(|s| parse_style_attr(s))
.and_then(|style| {
style.iter().find_map(|(prop, _)| {
match prop {
Property::BackgroundImage(images) => {
for img in images {
if let CSSImage::Url(path) = img {
if let Ok(url) =
Url::parse(&url_host).and_then(|s| s.join(path.url.trim()))
{
return Some(url);
}
}
}
}
Property::Background(backgrounds) => {
for bg in backgrounds {
if let CSSImage::Url(path) = &bg.image {
if let Ok(url) =
Url::parse(&url_host).and_then(|s| s.join(path.url.trim()))
{
return Some(url);
}
}
}
}
_ => {}
}
None
})
})
.map(|mut origin_poster_src| {
origin_poster_src.set_query(None);
origin_poster_src
});
let MikanEpisodeHomepage {
mikan_episode_id, ..
} = parse_mikan_episode_id_from_homepage(&url)
.wrap_err_with(|| format!("Failed to extract mikan_episode_id from {}", &url))?;
Ok(MikanEpisodeMeta {
mikan_bangumi_id,
mikan_fansub_id,
bangumi_title,
episode_title,
homepage: url,
origin_poster_src,
fansub,
mikan_episode_id,
})
}
#[cfg(test)]
mod test {
use std::assert_matches::assert_matches;
use url::Url;
use zune_image::{codecs::ImageFormat, image::Image};
use super::{
parse_mikan_bangumi_meta_from_mikan_homepage,
parse_mikan_bangumi_poster_from_origin_poster_src,
parse_mikan_episode_meta_from_mikan_homepage,
};
#[tokio::test]
async fn test_parse_mikan_episode() {
let test_fn = async || -> eyre::Result<()> {
let url_str =
"https://mikanani.me/Home/Episode/475184dce83ea2b82902592a5ac3343f6d54b36a";
let url = Url::parse(url_str)?;
let ep_meta = parse_mikan_episode_meta_from_mikan_homepage(None, url.clone()).await?;
assert_eq!(ep_meta.homepage, url);
assert_eq!(ep_meta.bangumi_title, "葬送的芙莉莲");
assert_eq!(
ep_meta.origin_poster_src,
Some(Url::parse(
"https://mikanani.me/images/Bangumi/202309/5ce9fed1.jpg"
)?)
);
assert_eq!(ep_meta.fansub, "LoliHouse");
assert_eq!(ep_meta.mikan_fansub_id, "370");
assert_eq!(ep_meta.mikan_bangumi_id, "3141");
assert_matches!(ep_meta.origin_poster_src, Some(..));
let bgm_poster = parse_mikan_bangumi_poster_from_origin_poster_src(
None,
ep_meta.origin_poster_src.unwrap(),
)
.await?;
let u8_data = bgm_poster.poster_data.expect("should have poster data");
let image = Image::read(u8_data.to_vec(), Default::default());
assert!(
image.is_ok_and(|img| img
.metadata()
.get_image_format()
.is_some_and(|fmt| matches!(fmt, ImageFormat::JPEG))),
"should start with valid jpeg data magic number"
);
Ok(())
};
test_fn().await.expect("test parse mikan failed");
}
#[tokio::test]
async fn test_parse_mikan_bangumi() {
let test_fn = async || -> eyre::Result<()> {
let url_str = "https://mikanani.me/Home/Bangumi/3416#370";
let url = Url::parse(url_str)?;
let bgm_meta = parse_mikan_bangumi_meta_from_mikan_homepage(None, url.clone()).await?;
assert_eq!(bgm_meta.homepage, url);
assert_eq!(bgm_meta.bangumi_title, "叹气的亡灵想隐退");
assert_eq!(
bgm_meta.origin_poster_src,
Some(Url::parse(
"https://mikanani.me/images/Bangumi/202410/480ef127.jpg"
)?)
);
assert_eq!(bgm_meta.fansub, Some(String::from("LoliHouse")));
assert_eq!(bgm_meta.mikan_fansub_id, Some(String::from("370")));
assert_eq!(bgm_meta.mikan_bangumi_id, "3416");
assert_eq!(
bgm_meta.homepage.as_str(),
"https://mikanani.me/Home/Bangumi/3416#370"
);
assert_eq!(bgm_meta.mikan_fansub_candidates.len(), 6);
Ok(())
};
test_fn().await.expect("test parse mikan failed");
}
}

View File

@ -2,6 +2,5 @@ pub mod defs;
pub mod errors;
pub mod html;
pub mod mikan;
pub mod raw;
pub mod title_parser;
pub mod rawname;
pub mod torrent;

View File

@ -0,0 +1,5 @@
pub mod parser;
pub use parser::{
extract_season_from_title_body, parse_episode_meta_from_raw_name, RawEpisodeMeta,
};

View File

@ -5,7 +5,7 @@ use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use crate::parsers::defs::{DIGIT_1PLUS_REG, ZH_NUM_MAP, ZH_NUM_RE};
use crate::extract::defs::{DIGIT_1PLUS_REG, ZH_NUM_MAP, ZH_NUM_RE};
const NAME_EXTRACT_REPLACE_ADHOC1_REPLACED: &str = "$1/$2";
@ -43,19 +43,19 @@ lazy_static! {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RawEpisodeMeta {
name_en: Option<String>,
name_en_no_season: Option<String>,
name_jp: Option<String>,
name_jp_no_season: Option<String>,
name_zh: Option<String>,
name_zh_no_season: Option<String>,
season: i32,
season_raw: Option<String>,
episode_index: i32,
sub: Option<String>,
source: Option<String>,
fansub: Option<String>,
resolution: Option<String>,
pub name_en: Option<String>,
pub name_en_no_season: Option<String>,
pub name_jp: Option<String>,
pub name_jp_no_season: Option<String>,
pub name_zh: Option<String>,
pub name_zh_no_season: Option<String>,
pub season: i32,
pub season_raw: Option<String>,
pub episode_index: i32,
pub subtitle: Option<String>,
pub source: Option<String>,
pub fansub: Option<String>,
pub resolution: Option<String>,
}
fn extract_fansub(raw_name: &str) -> Option<&str> {
@ -110,7 +110,7 @@ fn title_body_pre_process(title_body: &str, fansub: Option<&str>) -> eyre::Resul
Ok(raw.to_string())
}
fn extract_season_from_title_body(title_body: &str) -> (String, Option<String>, i32) {
pub fn extract_season_from_title_body(title_body: &str) -> (String, Option<String>, i32) {
let name_and_season = EN_BRACKET_SPLIT_RE.replace_all(title_body, " ");
let seasons = SEASON_EXTRACT_SEASON_ALL_RE
.find(&name_and_season)
@ -300,7 +300,7 @@ pub fn parse_episode_meta_from_raw_name(s: &str) -> eyre::Result<RawEpisodeMeta>
season,
season_raw,
episode_index,
sub,
subtitle: sub,
source,
fansub: fansub.map(|s| s.to_string()),
resolution,

View File

@ -0,0 +1,3 @@
mod parser;
pub use parser::*;

View File

@ -5,7 +5,7 @@ use quirks_path::Path;
use regex::Regex;
use serde::{Deserialize, Serialize};
use crate::parsers::defs::SUBTITLE_LANG;
use crate::extract::defs::SUBTITLE_LANG;
lazy_static! {
static ref TORRENT_EP_PARSE_RULES: Vec<FancyRegex> = {
@ -52,11 +52,11 @@ fn get_fansub(group_and_title: &str) -> (Option<&str>, &str) {
.filter(|s| !s.is_empty())
.collect::<Vec<_>>();
match (n.get(0), n.get(1)) {
match (n.first(), n.get(1)) {
(None, None) => (None, ""),
(Some(n0), None) => (None, *n0),
(Some(n0), Some(n1)) => {
if GET_FANSUB_FULL_MATCH_RE.is_match(*n1) {
if GET_FANSUB_FULL_MATCH_RE.is_match(n1) {
(None, group_and_title)
} else {
(Some(*n0), *n1)
@ -94,7 +94,7 @@ fn get_subtitle_lang(media_name: &str) -> Option<&str> {
return Some(lang);
}
}
return None;
None
}
pub fn parse_episode_media_meta_from_torrent(
@ -272,7 +272,7 @@ mod tests {
let expected: Option<TorrentEpisodeSubtitleMeta> = serde_json::from_str(expected).ok();
let found_raw =
parse_episode_subtitle_meta_from_torrent(Path::new(raw_name), None, None);
let found = found_raw.as_ref().ok().map(|s| s.clone());
let found = found_raw.as_ref().ok().cloned();
if expected != found {
if found_raw.is_ok() {
@ -293,7 +293,7 @@ mod tests {
} else {
let expected: Option<TorrentEpisodeMediaMeta> = serde_json::from_str(expected).ok();
let found_raw = parse_episode_media_meta_from_torrent(Path::new(raw_name), None, None);
let found = found_raw.as_ref().ok().map(|s| s.clone());
let found = found_raw.as_ref().ok().cloned();
if expected != found {
if found_raw.is_ok() {

View File

@ -0,0 +1,24 @@
use bytes::Bytes;
use reqwest::IntoUrl;
use super::{core::DEFAULT_HTTP_CLIENT_USER_AGENT, HttpClient};
pub async fn download_bytes<T: IntoUrl>(url: T) -> eyre::Result<Bytes> {
let request_client = reqwest::Client::builder()
.user_agent(DEFAULT_HTTP_CLIENT_USER_AGENT)
.build()?;
let bytes = request_client.get(url).send().await?.bytes().await?;
Ok(bytes)
}
pub async fn download_bytes_with_client<T: IntoUrl>(
client: Option<&HttpClient>,
url: T,
) -> eyre::Result<Bytes> {
if let Some(client) = client {
let bytes = client.get(url).send().await?.bytes().await?;
Ok(bytes)
} else {
download_bytes(url).await
}
}

View File

@ -0,0 +1,96 @@
use std::{ops::Deref, time::Duration};
use axum::http::Extensions;
use leaky_bucket::RateLimiter;
use reqwest::{ClientBuilder, Request, Response};
use reqwest_middleware::{
ClientBuilder as ClientWithMiddlewareBuilder, ClientWithMiddleware, Next,
};
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use reqwest_tracing::TracingMiddleware;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use super::DEFAULT_HTTP_CLIENT_USER_AGENT;
#[serde_as]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct HttpClientConfig {
pub exponential_backoff_max_retries: Option<u32>,
pub leaky_bucket_max_tokens: Option<u32>,
pub leaky_bucket_initial_tokens: Option<u32>,
pub leaky_bucket_refill_tokens: Option<u32>,
#[serde_as(as = "Option<serde_with::DurationMilliSeconds>")]
pub leaky_bucket_refill_interval: Option<Duration>,
pub user_agent: Option<String>,
}
pub struct HttpClient {
client: ClientWithMiddleware,
}
impl Deref for HttpClient {
type Target = ClientWithMiddleware;
fn deref(&self) -> &Self::Target {
&self.client
}
}
pub struct RateLimiterMiddleware {
rate_limiter: RateLimiter,
}
#[async_trait::async_trait]
impl reqwest_middleware::Middleware for RateLimiterMiddleware {
async fn handle(
&self,
req: Request,
extensions: &'_ mut Extensions,
next: Next<'_>,
) -> reqwest_middleware::Result<Response> {
self.rate_limiter.acquire_one().await;
next.run(req, extensions).await
}
}
impl HttpClient {
pub fn new(config: Option<HttpClientConfig>) -> reqwest::Result<Self> {
let mut config = config.unwrap_or_default();
let retry_policy = ExponentialBackoff::builder()
.build_with_max_retries(config.exponential_backoff_max_retries.take().unwrap_or(3));
let rate_limiter = RateLimiter::builder()
.max(config.leaky_bucket_max_tokens.take().unwrap_or(3) as usize)
.initial(
config
.leaky_bucket_initial_tokens
.take()
.unwrap_or_default() as usize,
)
.refill(config.leaky_bucket_refill_tokens.take().unwrap_or(1) as usize)
.interval(
config
.leaky_bucket_refill_interval
.take()
.unwrap_or_else(|| Duration::from_millis(500)),
)
.build();
let client = ClientBuilder::new()
.user_agent(
config
.user_agent
.take()
.unwrap_or_else(|| DEFAULT_HTTP_CLIENT_USER_AGENT.to_owned()),
)
.build()?;
Ok(Self {
client: ClientWithMiddlewareBuilder::new(client)
.with(TracingMiddleware::default())
.with(RateLimiterMiddleware { rate_limiter })
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build(),
})
}
}

View File

@ -0,0 +1 @@
pub const DEFAULT_HTTP_CLIENT_USER_AGENT: &str = "Wget/1.13.4 (linux-gnu)";

View File

@ -0,0 +1,23 @@
use reqwest::IntoUrl;
use super::{core::DEFAULT_HTTP_CLIENT_USER_AGENT, HttpClient};
pub async fn download_html<U: IntoUrl>(url: U) -> eyre::Result<String> {
let request_client = reqwest::Client::builder()
.user_agent(DEFAULT_HTTP_CLIENT_USER_AGENT)
.build()?;
let content = request_client.get(url).send().await?.text().await?;
Ok(content)
}
pub async fn download_html_with_client<T: IntoUrl>(
client: Option<&HttpClient>,
url: T,
) -> eyre::Result<String> {
if let Some(client) = client {
let content = client.get(url).send().await?.text().await?;
Ok(content)
} else {
download_html(url).await
}
}

View File

@ -0,0 +1,18 @@
use bytes::Bytes;
use reqwest::IntoUrl;
use super::{
bytes::{download_bytes, download_bytes_with_client},
HttpClient,
};
pub async fn download_image<U: IntoUrl>(url: U) -> eyre::Result<Bytes> {
download_bytes(url).await
}
pub async fn download_image_with_client<T: IntoUrl>(
client: Option<&HttpClient>,
url: T,
) -> eyre::Result<Bytes> {
download_bytes_with_client(client, url).await
}

View File

@ -0,0 +1,11 @@
pub mod bytes;
pub mod client;
pub mod core;
pub mod html;
pub mod image;
pub use core::DEFAULT_HTTP_CLIENT_USER_AGENT;
pub use bytes::download_bytes;
pub use client::{HttpClient, HttpClientConfig};
pub use image::download_image;

View File

@ -1,14 +1,13 @@
#![feature(async_closure, duration_constructors)]
#![feature(duration_constructors, assert_matches)]
pub mod app;
pub mod config;
pub mod controllers;
pub mod dal;
pub mod downloaders;
pub mod extract;
pub mod fetch;
pub mod migrations;
pub mod models;
pub mod parsers;
pub mod path;
pub mod tasks;
pub mod views;
pub mod workers;

View File

@ -1,4 +1,4 @@
use std::{collections::HashSet, fmt::Display};
use std::collections::HashSet;
use sea_orm::{DeriveIden, Statement};
use sea_orm_migration::prelude::{extension::postgres::IntoTypeRef, *};
@ -18,17 +18,17 @@ pub enum Subscribers {
Pid,
DisplayName,
DownloaderId,
BangumiConf,
}
#[derive(DeriveIden)]
pub enum Subscriptions {
Table,
Id,
SubscriberId,
DisplayName,
SubscriberId,
Category,
SourceUrl,
Aggregate,
Enabled,
}
@ -36,32 +36,61 @@ pub enum Subscriptions {
pub enum Bangumi {
Table,
Id,
MikanBangumiId,
DisplayName,
SubscriptionId,
RawName,
Season,
SeasonRaw,
Fansub,
MikanFansubId,
Filter,
RssLink,
PosterLink,
SavePath,
Deleted,
Homepage,
Extra,
}
#[derive(DeriveIden)]
pub enum Episodes {
Table,
Id,
MikanEpisodeId,
RawName,
DisplayName,
BangumiId,
OutputName,
SubscriptionId,
DownloadId,
SavePath,
Resolution,
Season,
SeasonRaw,
Fansub,
PosterLink,
EpisodeIndex,
Homepage,
Subtitle,
Deleted,
Source,
Extra,
}
#[derive(DeriveIden)]
pub enum Downloads {
Table,
Id,
SubscriptionId,
OriginalName,
DisplayName,
SubscriptionId,
Status,
CurrSize,
AllSize,
Mime,
Url,
Homepage,
SavePath,
}
#[derive(DeriveIden)]
@ -73,7 +102,17 @@ pub enum Downloaders {
Password,
Username,
SubscriberId,
DownloadPath,
SavePath,
}
macro_rules! create_postgres_enum_for_active_enum {
($manager: expr, $active_enum: expr, $($enum_value:expr),+) => {
{
use sea_orm::ActiveEnum;
let values = [$($enum_value,)+].map(|v| ActiveEnum::to_value(&v));
($manager).create_postgres_enum_for_active_enum($active_enum, values)
}
};
}
#[async_trait::async_trait]
@ -151,8 +190,7 @@ pub trait CustomSchemaManagerExt {
async fn create_postgres_enum_for_active_enum<
E: IntoTypeRef + IntoIden + Send + Clone,
T: Display + Send,
I: IntoIterator<Item = T> + Send,
I: IntoIterator<Item = String> + Send,
>(
&self,
enum_name: E,
@ -161,8 +199,7 @@ pub trait CustomSchemaManagerExt {
async fn add_postgres_enum_values_for_active_enum<
E: IntoTypeRef + IntoIden + Send + Clone,
T: Display + Send,
I: IntoIterator<Item = T> + Send,
I: IntoIterator<Item = String> + Send,
>(
&self,
enum_name: E,
@ -186,7 +223,7 @@ pub trait CustomSchemaManagerExt {
}
#[async_trait::async_trait]
impl<'c> CustomSchemaManagerExt for SchemaManager<'c> {
impl CustomSchemaManagerExt for SchemaManager<'_> {
async fn create_postgres_auto_update_ts_fn(&self, col_name: &str) -> Result<(), DbErr> {
let sql = format!(
"CREATE OR REPLACE FUNCTION update_{col_name}_column() RETURNS TRIGGER AS $$ BEGIN \
@ -239,8 +276,7 @@ impl<'c> CustomSchemaManagerExt for SchemaManager<'c> {
async fn create_postgres_enum_for_active_enum<
E: IntoTypeRef + IntoIden + Send + Clone,
T: Display + Send,
I: IntoIterator<Item = T> + Send,
I: IntoIterator<Item = String> + Send,
>(
&self,
enum_name: E,
@ -248,10 +284,7 @@ impl<'c> CustomSchemaManagerExt for SchemaManager<'c> {
) -> Result<(), DbErr> {
let existed = self.if_postgres_enum_exists(enum_name.clone()).await?;
if !existed {
let idents = values
.into_iter()
.map(|v| Alias::new(v.to_string()))
.collect::<Vec<_>>();
let idents = values.into_iter().map(Alias::new).collect::<Vec<_>>();
self.create_type(Type::create().as_enum(enum_name).values(idents).to_owned())
.await?;
} else {
@ -263,8 +296,7 @@ impl<'c> CustomSchemaManagerExt for SchemaManager<'c> {
async fn add_postgres_enum_values_for_active_enum<
E: IntoTypeRef + IntoIden + Send + Clone,
T: Display + Send,
I: IntoIterator<Item = T> + Send,
I: IntoIterator<Item = String> + Send,
>(
&self,
enum_name: E,
@ -273,7 +305,7 @@ impl<'c> CustomSchemaManagerExt for SchemaManager<'c> {
let exists_values = self.get_postgres_enum_values(enum_name.clone()).await?;
let to_add_values = values
.into_iter()
.filter(|v| !exists_values.contains(&v.to_string()))
.filter(|v| !exists_values.contains(v as &str))
.collect::<Vec<_>>();
if to_add_values.is_empty() {
@ -283,7 +315,7 @@ impl<'c> CustomSchemaManagerExt for SchemaManager<'c> {
let mut type_alter = Type::alter().name(enum_name);
for v in to_add_values {
type_alter = type_alter.add_value(Alias::new(v.to_string()));
type_alter = type_alter.add_value(Alias::new(v));
}
self.alter_type(type_alter.to_owned()).await?;
@ -294,8 +326,10 @@ impl<'c> CustomSchemaManagerExt for SchemaManager<'c> {
&self,
enum_name: E,
) -> Result<(), DbErr> {
self.drop_type(Type::drop().name(enum_name).to_owned())
.await?;
if self.if_postgres_enum_exists(enum_name.clone()).await? {
self.drop_type(Type::drop().name(enum_name).to_owned())
.await?;
}
Ok(())
}

View File

@ -1,9 +1,13 @@
use loco_rs::schema::jsonb_null;
use sea_orm_migration::{prelude::*, schema::*};
use super::defs::{
Bangumi, CustomSchemaManagerExt, Episodes, GeneralIds, Subscribers, Subscriptions,
};
use crate::models::{subscribers::ROOT_SUBSCRIBER, subscriptions};
use crate::models::{
subscribers::ROOT_SUBSCRIBER,
subscriptions::{self, SubscriptionCategoryEnum},
};
#[derive(DeriveMigrationName)]
pub struct Migration;
@ -14,15 +18,18 @@ impl MigrationTrait for Migration {
manager
.create_postgres_auto_update_ts_fn_for_col(GeneralIds::UpdatedAt)
.await?;
manager
.create_table(
table_auto(Subscribers::Table)
.col(pk_auto(Subscribers::Id))
.col(string_len_uniq(Subscribers::Pid, 64))
.col(string(Subscribers::DisplayName))
.col(jsonb_null(Subscribers::BangumiConf))
.to_owned(),
)
.await?;
manager
.create_postgres_auto_update_ts_trigger_for_col(
Subscribers::Table,
@ -37,15 +44,13 @@ impl MigrationTrait for Migration {
.to_owned();
manager.exec_stmt(insert).await?;
manager
.create_postgres_enum_for_active_enum(
subscriptions::SubscriptionCategoryEnum,
&[
subscriptions::SubscriptionCategory::Mikan,
subscriptions::SubscriptionCategory::Manual,
],
)
.await?;
create_postgres_enum_for_active_enum!(
manager,
subscriptions::SubscriptionCategoryEnum,
subscriptions::SubscriptionCategory::Mikan,
subscriptions::SubscriptionCategory::Manual
)
.await?;
manager
.create_table(
@ -54,7 +59,6 @@ impl MigrationTrait for Migration {
.col(string(Subscriptions::DisplayName))
.col(integer(Subscriptions::SubscriberId))
.col(text(Subscriptions::SourceUrl))
.col(boolean(Subscriptions::Aggregate))
.col(boolean(Subscriptions::Enabled))
.col(enumeration(
Subscriptions::Category,
@ -63,7 +67,7 @@ impl MigrationTrait for Migration {
))
.foreign_key(
ForeignKey::create()
.name("fk_subscription_subscriber_id")
.name("fk_subscriptions_subscriber_id")
.from(Subscriptions::Table, Subscriptions::SubscriberId)
.to(Subscribers::Table, Subscribers::Id)
.on_update(ForeignKeyAction::Restrict)
@ -84,8 +88,21 @@ impl MigrationTrait for Migration {
.create_table(
table_auto(Bangumi::Table)
.col(pk_auto(Bangumi::Id))
.col(text(Bangumi::DisplayName))
.col(text_null(Bangumi::MikanBangumiId))
.col(integer(Bangumi::SubscriptionId))
.col(text(Bangumi::DisplayName))
.col(text(Bangumi::RawName))
.col(integer(Bangumi::Season))
.col(text_null(Bangumi::SeasonRaw))
.col(text_null(Bangumi::Fansub))
.col(text_null(Bangumi::MikanFansubId))
.col(jsonb_null(Bangumi::Filter))
.col(text_null(Bangumi::RssLink))
.col(text_null(Bangumi::PosterLink))
.col(text_null(Bangumi::SavePath))
.col(boolean(Bangumi::Deleted).default(false))
.col(text_null(Bangumi::Homepage))
.col(jsonb_null(Bangumi::Extra))
.foreign_key(
ForeignKey::create()
.name("fk_bangumi_subscription_id")
@ -98,6 +115,28 @@ impl MigrationTrait for Migration {
)
.await?;
manager
.create_index(
Index::create()
.if_not_exists()
.name("idx_bangumi_mikan_bangumi_id")
.table(Bangumi::Table)
.col(Bangumi::MikanBangumiId)
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.if_not_exists()
.name("idx_bangumi_mikan_fansub_id")
.table(Bangumi::Table)
.col(Bangumi::MikanFansubId)
.to_owned(),
)
.await?;
manager
.create_postgres_auto_update_ts_trigger_for_col(Bangumi::Table, GeneralIds::UpdatedAt)
.await?;
@ -106,12 +145,34 @@ impl MigrationTrait for Migration {
.create_table(
table_auto(Episodes::Table)
.col(pk_auto(Episodes::Id))
.col(text_null(Episodes::MikanEpisodeId))
.col(text(Episodes::RawName))
.col(text(Episodes::DisplayName))
.col(integer(Episodes::BangumiId))
.col(text(Episodes::OutputName))
.col(integer(Episodes::SubscriptionId))
.col(text_null(Episodes::SavePath))
.col(text_null(Episodes::Resolution))
.col(integer(Episodes::Season))
.col(text_null(Episodes::SeasonRaw))
.col(text_null(Episodes::Fansub))
.col(text_null(Episodes::PosterLink))
.col(integer(Episodes::EpisodeIndex))
.col(text_null(Episodes::Homepage))
.col(text_null(Episodes::Subtitle))
.col(boolean(Episodes::Deleted).default(false))
.col(text_null(Episodes::Source))
.col(jsonb_null(Episodes::Extra))
.foreign_key(
ForeignKey::create()
.name("fk_episode_bangumi_id")
.name("fk_episodes_subscription_id")
.from(Episodes::Table, Episodes::SubscriptionId)
.to(Subscriptions::Table, Subscriptions::Id)
.on_update(ForeignKeyAction::Restrict)
.on_delete(ForeignKeyAction::Cascade),
)
.foreign_key(
ForeignKey::create()
.name("fk_episodes_bangumi_id")
.from(Episodes::Table, Episodes::BangumiId)
.to(Bangumi::Table, Bangumi::Id)
.on_update(ForeignKeyAction::Restrict)
@ -121,6 +182,30 @@ impl MigrationTrait for Migration {
)
.await?;
manager
.create_index(
Index::create()
.if_not_exists()
.name("idx_episodes_mikan_episode_id")
.table(Episodes::Table)
.col(Episodes::MikanEpisodeId)
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.if_not_exists()
.name("idx_episodes_bangumi_id_mikan_episode_id")
.table(Episodes::Table)
.col(Episodes::BangumiId)
.col(Episodes::MikanEpisodeId)
.unique()
.to_owned(),
)
.await?;
manager
.create_postgres_auto_update_ts_trigger_for_col(Episodes::Table, GeneralIds::UpdatedAt)
.await?;
@ -129,18 +214,24 @@ impl MigrationTrait for Migration {
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(Episodes::Table).to_owned())
.await?;
manager
.drop_postgres_auto_update_ts_trigger_for_col(Episodes::Table, GeneralIds::UpdatedAt)
.await?;
manager
.drop_table(Table::drop().table(Episodes::Table).to_owned())
.drop_table(Table::drop().table(Bangumi::Table).to_owned())
.await?;
manager
.drop_postgres_auto_update_ts_trigger_for_col(Bangumi::Table, GeneralIds::UpdatedAt)
.await?;
manager
.drop_table(Table::drop().table(Bangumi::Table).to_owned())
.drop_table(Table::drop().table(Subscriptions::Table).to_owned())
.await?;
manager
@ -149,8 +240,9 @@ impl MigrationTrait for Migration {
GeneralIds::UpdatedAt,
)
.await?;
manager
.drop_table(Table::drop().table(Subscriptions::Table).to_owned())
.drop_table(Table::drop().table(Subscribers::Table).to_owned())
.await?;
manager
@ -158,11 +250,15 @@ impl MigrationTrait for Migration {
.await?;
manager
.drop_table(Table::drop().table(Subscribers::Table).to_owned())
.drop_postgres_enum_for_active_enum(subscriptions::SubscriptionCategoryEnum)
.await?;
manager
.drop_postgres_enum_for_active_enum(subscriptions::SubscriptionCategoryEnum)
.drop_postgres_auto_update_ts_fn_for_col(GeneralIds::UpdatedAt)
.await?;
manager
.drop_postgres_enum_for_active_enum(SubscriptionCategoryEnum)
.await?;
Ok(())

View File

@ -13,26 +13,25 @@ pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_postgres_enum_for_active_enum(
DownloadMimeEnum,
&[DownloadMime::OctetStream, DownloadMime::BitTorrent],
)
.await?;
create_postgres_enum_for_active_enum!(
manager,
DownloadMimeEnum,
DownloadMime::BitTorrent,
DownloadMime::OctetStream
)
.await?;
manager
.create_postgres_enum_for_active_enum(
DownloadStatusEnum,
&[
DownloadStatus::Pending,
DownloadStatus::Downloading,
DownloadStatus::Completed,
DownloadStatus::Failed,
DownloadStatus::Deleted,
DownloadStatus::Paused,
],
)
.await?;
create_postgres_enum_for_active_enum!(
manager,
DownloadStatusEnum,
DownloadStatus::Downloading,
DownloadStatus::Paused,
DownloadStatus::Pending,
DownloadStatus::Completed,
DownloadStatus::Failed,
DownloadStatus::Deleted
)
.await?;
manager
.create_table(
@ -54,15 +53,11 @@ impl MigrationTrait for Migration {
.col(big_unsigned(Downloads::AllSize))
.col(big_unsigned(Downloads::CurrSize))
.col(text(Downloads::Url))
.index(
Index::create()
.table(Downloads::Table)
.col(Downloads::Url)
.name("idx_download_url"),
)
.col(text_null(Downloads::Homepage))
.col(text_null(Downloads::SavePath))
.foreign_key(
ForeignKey::create()
.name("fk_download_subscription_id")
.name("fk_downloads_subscription_id")
.from(Downloads::Table, Downloads::SubscriptionId)
.to(Subscriptions::Table, Subscriptions::Id)
.on_update(ForeignKeyAction::Restrict)
@ -73,7 +68,14 @@ impl MigrationTrait for Migration {
.await?;
manager
.create_postgres_auto_update_ts_fn_for_col(GeneralIds::UpdatedAt)
.create_index(
Index::create()
.if_not_exists()
.name("idx_downloads_url")
.table(Downloads::Table)
.col(Downloads::Url)
.to_owned(),
)
.await?;
manager
@ -83,7 +85,7 @@ impl MigrationTrait for Migration {
.add_column_if_not_exists(integer_null(Episodes::DownloadId))
.add_foreign_key(
TableForeignKey::new()
.name("fk_episode_download_id")
.name("fk_episodes_download_id")
.from_tbl(Episodes::Table)
.from_col(Episodes::DownloadId)
.to_tbl(Downloads::Table)
@ -103,16 +105,12 @@ impl MigrationTrait for Migration {
.alter_table(
Table::alter()
.table(Episodes::Table)
.drop_foreign_key(Alias::new("fk_episode_download_id"))
.drop_foreign_key(Alias::new("fk_episodes_download_id"))
.drop_column(Episodes::DownloadId)
.to_owned(),
)
.await?;
manager
.drop_postgres_auto_update_ts_fn_for_col(GeneralIds::UpdatedAt)
.await?;
manager
.drop_table(Table::drop().table(Downloads::Table).to_owned())
.await?;
@ -120,6 +118,7 @@ impl MigrationTrait for Migration {
manager
.drop_postgres_enum_for_active_enum(DownloadMimeEnum)
.await?;
manager
.drop_postgres_enum_for_active_enum(DownloadStatusEnum)
.await?;

View File

@ -11,12 +11,12 @@ pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_postgres_enum_for_active_enum(
DownloaderCategoryEnum,
&[DownloaderCategory::QBittorrent],
)
.await?;
create_postgres_enum_for_active_enum!(
manager,
DownloaderCategoryEnum,
DownloaderCategory::QBittorrent
)
.await?;
manager
.create_table(
@ -30,7 +30,7 @@ impl MigrationTrait for Migration {
DownloaderCategoryEnum,
DownloaderCategory::iden_values(),
))
.col(text(Downloaders::DownloadPath))
.col(text(Downloaders::SavePath))
.col(integer(Downloaders::SubscriberId))
.foreign_key(
ForeignKey::create()
@ -60,7 +60,7 @@ impl MigrationTrait for Migration {
.add_column_if_not_exists(integer_null(Subscribers::DownloaderId))
.add_foreign_key(
TableForeignKey::new()
.name("fk_subscriber_downloader_id")
.name("fk_subscribers_downloader_id")
.from_tbl(Subscribers::Table)
.from_col(Subscribers::DownloaderId)
.to_tbl(Downloaders::Table)
@ -79,7 +79,7 @@ impl MigrationTrait for Migration {
.alter_table(
Table::alter()
.table(Subscribers::Table)
.drop_foreign_key(Alias::new("fk_subscriber_downloader_id"))
.drop_foreign_key(Alias::new("fk_subscribers_downloader_id"))
.drop_column(Subscribers::DownloaderId)
.to_owned(),
)

View File

@ -1,5 +1,6 @@
pub use sea_orm_migration::prelude::*;
#[macro_use]
pub mod defs;
pub mod m20220101_000001_init;
pub mod m20240224_082543_add_downloads;

View File

@ -1,6 +1,43 @@
use sea_orm::entity::prelude::*;
use loco_rs::app::AppContext;
use sea_orm::{entity::prelude::*, ActiveValue, TryIntoModel};
pub use super::entities::bangumi::*;
impl Model {
pub async fn get_or_insert_from_mikan<F>(
ctx: &AppContext,
subscription_id: i32,
mikan_bangumi_id: String,
mikan_fansub_id: String,
f: F,
) -> eyre::Result<Model>
where
F: AsyncFnOnce(&mut ActiveModel) -> eyre::Result<()>,
{
let db = &ctx.db;
if let Some(existed) = Entity::find()
.filter(
Column::MikanBangumiId
.eq(Some(mikan_bangumi_id.clone()))
.and(Column::MikanFansubId.eq(Some(mikan_fansub_id.clone()))),
)
.one(db)
.await?
{
Ok(existed)
} else {
let mut bgm = ActiveModel {
mikan_bangumi_id: ActiveValue::Set(Some(mikan_bangumi_id)),
mikan_fansub_id: ActiveValue::Set(Some(mikan_fansub_id)),
subscription_id: ActiveValue::Set(subscription_id),
..Default::default()
};
f(&mut bgm).await?;
let bgm: Model = bgm.save(db).await?.try_into_model()?;
Ok(bgm)
}
}
}
#[async_trait::async_trait]
impl ActiveModelBehavior for ActiveModel {}

View File

@ -7,6 +7,9 @@ pub use crate::models::entities::downloaders::*;
impl ActiveModelBehavior for ActiveModel {}
impl Model {
pub fn get_endpoint(&self) -> String {
self.endpoint.clone()
}
pub fn endpoint_url(&self) -> Result<Url, url::ParseError> {
let url = Url::parse(&self.endpoint)?;
Ok(url)

View File

@ -1,11 +1,7 @@
use loco_rs::app::AppContext;
use sea_orm::{prelude::*, sea_query::OnConflict, ActiveValue, Condition, QueryOrder, QuerySelect};
use sea_orm::{prelude::*, ActiveValue};
use crate::extract::mikan::MikanRssItem;
pub use crate::models::entities::downloads::*;
use crate::{
models::subscriptions::{self, SubscriptionCategory},
parsers::mikan::{parse_mikan_rss_items_from_rss_link, MikanRssItem},
};
#[async_trait::async_trait]
impl ActiveModelBehavior for ActiveModel {}
@ -18,73 +14,14 @@ impl ActiveModel {
subscription_id: ActiveValue::Set(subscription_id),
status: ActiveValue::Set(DownloadStatus::Pending),
mime: ActiveValue::Set(DownloadMime::BitTorrent),
url: ActiveValue::Set(m.url),
url: ActiveValue::Set(m.url.to_string()),
curr_size: ActiveValue::Set(m.content_length.as_ref().map(|_| 0)),
all_size: ActiveValue::Set(m.content_length),
homepage: ActiveValue::Set(m.homepage),
homepage: ActiveValue::Set(Some(m.homepage.to_string())),
..Default::default()
};
todo!()
}
}
impl Model {
pub async fn pull_subscription(
ctx: AppContext,
item: &subscriptions::Model,
) -> eyre::Result<Vec<i32>> {
let db = &ctx.db;
match &item.category {
SubscriptionCategory::Mikan => {
let items = parse_mikan_rss_items_from_rss_link(&item.source_url).await?;
let all_items = items.collect::<Vec<_>>();
let last_old_id = {
Entity::find()
.select_only()
.column(Column::Id)
.order_by_desc(Column::Id)
.filter(Column::SubscriptionId.eq(item.id))
.one(db)
.await?
}
.map(|i| i.id);
if all_items.is_empty() {
return Ok(vec![]);
}
let new_items = all_items
.into_iter()
.map(|i| ActiveModel::from_mikan_rss_item(i, item.id));
let insert_result = Entity::insert_many(new_items)
.on_conflict(OnConflict::column(Column::Url).do_nothing().to_owned())
.exec(db)
.await?;
let insert_ids = Entity::find()
.select_only()
.column(Column::Id)
.filter({
let mut cond = Condition::all()
.add(Column::SubscriptionId.eq(item.id))
.add(Column::Id.lte(insert_result.last_insert_id));
if let Some(last_old_id) = last_old_id {
cond = cond.add(Column::Id.gt(last_old_id))
}
cond
})
.all(db)
.await?;
Ok(insert_ids.into_iter().map(|i| i.id).collect::<Vec<_>>())
}
_ => {
todo!("other subscription categories")
}
}
}
}
impl Model {}

View File

@ -7,6 +7,16 @@ pub struct BangumiFilter {
pub group: Option<Vec<String>>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, FromJsonQueryResult)]
pub struct BangumiExtra {
pub name_zh: Option<String>,
pub s_name_zh: Option<String>,
pub name_en: Option<String>,
pub s_name_en: Option<String>,
pub name_jp: Option<String>,
pub s_name_jp: Option<String>,
}
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "bangumi")]
pub struct Model {
@ -14,17 +24,22 @@ pub struct Model {
pub updated_at: DateTime,
#[sea_orm(primary_key)]
pub id: i32,
pub mikan_bangumi_id: Option<String>,
pub subscription_id: i32,
pub display_name: String,
pub official_title: String,
pub raw_name: String,
pub season: i32,
pub season_raw: Option<String>,
pub fansub: Option<String>,
pub mikan_fansub_id: Option<String>,
pub filter: Option<BangumiFilter>,
pub rss_link: Option<String>,
pub poster_link: Option<String>,
pub save_path: Option<String>,
#[sea_orm(default = "false")]
pub deleted: bool,
pub homepage: Option<String>,
pub extra: Option<BangumiExtra>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@ -50,6 +50,7 @@ pub struct Model {
pub all_size: Option<u64>,
pub curr_size: Option<u64>,
pub homepage: Option<String>,
pub save_path: Option<String>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@ -1,8 +1,17 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
use sea_orm::entity::prelude::*;
use sea_orm::{entity::prelude::*, FromJsonQueryResult};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, FromJsonQueryResult, Default)]
pub struct EpisodeExtra {
pub name_zh: Option<String>,
pub s_name_zh: Option<String>,
pub name_en: Option<String>,
pub s_name_en: Option<String>,
pub name_jp: Option<String>,
pub s_name_jp: Option<String>,
}
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "episodes")]
pub struct Model {
@ -10,27 +19,26 @@ pub struct Model {
pub updated_at: DateTime,
#[sea_orm(primary_key)]
pub id: i32,
#[sea_orm(indexed)]
pub mikan_episode_id: Option<String>,
pub raw_name: String,
pub official_title: String,
pub display_name: String,
pub name_zh: Option<String>,
pub name_jp: Option<String>,
pub name_en: Option<String>,
pub s_name_zh: Option<String>,
pub s_name_jp: Option<String>,
pub s_name_en: Option<String>,
pub bangumi_id: i32,
pub download_id: i32,
pub save_path: String,
pub subscription_id: i32,
pub download_id: Option<i32>,
pub save_path: Option<String>,
pub resolution: Option<String>,
pub season: i32,
pub season_raw: Option<String>,
pub fansub: Option<String>,
pub poster_link: Option<String>,
pub home_page: Option<String>,
pub episode_index: i32,
pub homepage: Option<String>,
pub subtitle: Option<Vec<String>>,
#[sea_orm(default = "false")]
pub deleted: bool,
pub source: Option<String>,
pub extra: EpisodeExtra,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
@ -47,6 +55,12 @@ pub enum Relation {
to = "super::downloads::Column::Id"
)]
Downloads,
#[sea_orm(
belongs_to = "super::subscriptions::Entity",
from = "Column::SubscriptionId",
to = "super::subscriptions::Column::Id"
)]
Subscriptions,
}
impl Related<super::bangumi::Entity> for Entity {
@ -60,3 +74,9 @@ impl Related<super::downloads::Entity> for Entity {
Relation::Downloads.def()
}
}
impl Related<super::subscriptions::Entity> for Entity {
fn to() -> RelationDef {
Relation::Subscriptions.def()
}
}

View File

@ -19,7 +19,7 @@ pub struct Model {
pub pid: String,
pub display_name: String,
pub downloader_id: Option<i32>,
pub bangumi_conf: SubscriberBangumiConfig,
pub bangumi_conf: Option<SubscriberBangumiConfig>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@ -30,7 +30,6 @@ pub struct Model {
pub subscriber_id: i32,
pub category: SubscriptionCategory,
pub source_url: String,
pub aggregate: bool,
pub enabled: bool,
}
@ -44,6 +43,8 @@ pub enum Relation {
Subscriber,
#[sea_orm(has_many = "super::bangumi::Entity")]
Bangumi,
#[sea_orm(has_many = "super::episodes::Entity")]
Episodes,
}
impl Related<super::subscribers::Entity> for Entity {
@ -57,3 +58,9 @@ impl Related<super::bangumi::Entity> for Entity {
Relation::Bangumi.def()
}
}
impl Related<super::episodes::Entity> for Entity {
fn to() -> RelationDef {
Relation::Episodes.def()
}
}

View File

@ -1,6 +1,91 @@
use sea_orm::entity::prelude::*;
use std::sync::Arc;
use loco_rs::app::AppContext;
use sea_orm::{entity::prelude::*, sea_query::OnConflict, ActiveValue};
use super::bangumi;
pub use super::entities::episodes::*;
use crate::{
app::AppContextExt,
extract::{
mikan::{build_mikan_episode_homepage, MikanEpisodeMeta},
rawname::parse_episode_meta_from_raw_name,
},
};
#[derive(Clone, Debug, PartialEq)]
pub struct MikanEpsiodeCreation {
pub episode: MikanEpisodeMeta,
pub bangumi: Arc<bangumi::Model>,
}
impl Model {
pub async fn add_episodes(
ctx: &AppContext,
creations: impl IntoIterator<Item = MikanEpsiodeCreation>,
) -> eyre::Result<()> {
let db = &ctx.db;
let new_episode_active_modes = creations
.into_iter()
.flat_map(|cr| ActiveModel::from_mikan_episode_meta(ctx, cr));
Entity::insert_many(new_episode_active_modes)
.on_conflict(
OnConflict::columns([Column::BangumiId, Column::MikanEpisodeId])
.do_nothing()
.to_owned(),
)
.exec(db)
.await?;
Ok(())
}
}
impl ActiveModel {
pub fn from_mikan_episode_meta(
ctx: &AppContext,
creation: MikanEpsiodeCreation,
) -> eyre::Result<Self> {
let item = creation.episode;
let bgm = creation.bangumi;
let raw_meta = parse_episode_meta_from_raw_name(&item.episode_title)?;
let homepage = build_mikan_episode_homepage(
ctx.get_mikan_client().base_url(),
&item.mikan_episode_id,
)?;
Ok(Self {
mikan_episode_id: ActiveValue::Set(Some(item.mikan_episode_id)),
raw_name: ActiveValue::Set(item.episode_title.clone()),
display_name: ActiveValue::Set(item.episode_title.clone()),
bangumi_id: ActiveValue::Set(bgm.id),
subscription_id: ActiveValue::Set(bgm.subscription_id),
resolution: ActiveValue::Set(raw_meta.resolution),
season: ActiveValue::Set(if raw_meta.season > 0 {
raw_meta.season
} else {
bgm.season
}),
season_raw: ActiveValue::Set(raw_meta.season_raw.or_else(|| bgm.season_raw.clone())),
fansub: ActiveValue::Set(raw_meta.fansub.or_else(|| bgm.fansub.clone())),
poster_link: ActiveValue::Set(bgm.poster_link.clone()),
episode_index: ActiveValue::Set(raw_meta.episode_index),
homepage: ActiveValue::Set(Some(homepage.to_string())),
subtitle: ActiveValue::Set(raw_meta.subtitle.map(|s| vec![s])),
source: ActiveValue::Set(raw_meta.source),
extra: ActiveValue::Set(EpisodeExtra {
name_zh: raw_meta.name_zh,
name_en: raw_meta.name_en,
name_jp: raw_meta.name_jp,
s_name_en: raw_meta.name_en_no_season,
s_name_jp: raw_meta.name_jp_no_season,
s_name_zh: raw_meta.name_zh_no_season,
}),
..Default::default()
})
}
}
#[async_trait::async_trait]
impl ActiveModelBehavior for ActiveModel {}

View File

@ -5,5 +5,6 @@ pub mod entities;
pub mod episodes;
pub mod notifications;
pub mod prelude;
pub mod query;
pub mod subscribers;
pub mod subscriptions;

View File

@ -2,7 +2,6 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Notification {
official_title: String,
season: i32,
episode_size: u32,
poster_url: Option<String>,

View File

@ -0,0 +1,26 @@
use sea_orm::{
prelude::Expr,
sea_query::{Alias, IntoColumnRef, IntoTableRef, Query, SelectStatement},
Value,
};
pub fn filter_values_in<
I: IntoIterator<Item = T>,
T: Into<Value>,
R: IntoTableRef,
C: IntoColumnRef + Copy,
>(
tbl_ref: R,
col_ref: C,
values: I,
) -> SelectStatement {
Query::select()
.expr(Expr::col((Alias::new("t"), Alias::new("column1"))))
.from_values(values, Alias::new("t"))
.left_join(
tbl_ref,
Expr::col((Alias::new("t"), Alias::new("column1"))).equals(col_ref),
)
.and_where(Expr::col(col_ref).is_not_null())
.to_owned()
}

View File

@ -1,4 +1,7 @@
use loco_rs::model::{ModelError, ModelResult};
use loco_rs::{
app::AppContext,
model::{ModelError, ModelResult},
};
use sea_orm::{entity::prelude::*, ActiveValue, TransactionTrait};
use serde::{Deserialize, Serialize};
@ -33,7 +36,8 @@ impl Model {
/// # Errors
///
/// When could not find user or DB query error
pub async fn find_by_pid(db: &DatabaseConnection, pid: &str) -> ModelResult<Self> {
pub async fn find_by_pid(ctx: &AppContext, pid: &str) -> ModelResult<Self> {
let db = &ctx.db;
let parse_uuid = Uuid::parse_str(pid).map_err(|e| ModelError::Any(e.into()))?;
let subscriber = Entity::find()
.filter(Column::Pid.eq(parse_uuid))
@ -42,8 +46,30 @@ impl Model {
subscriber.ok_or_else(|| ModelError::EntityNotFound)
}
pub async fn find_root(db: &DatabaseConnection) -> ModelResult<Self> {
Self::find_by_pid(db, ROOT_SUBSCRIBER).await
pub async fn find_by_id(ctx: &AppContext, id: i32) -> ModelResult<Self> {
let db = &ctx.db;
let subscriber = Entity::find_by_id(id).one(db).await?;
subscriber.ok_or_else(|| ModelError::EntityNotFound)
}
pub async fn find_pid_by_id_with_cache(ctx: &AppContext, id: i32) -> eyre::Result<String> {
let db = &ctx.db;
let cache = &ctx.cache;
let pid = cache
.get_or_insert(&format!("subscriber-id2pid::{}", id), async {
let subscriber = Entity::find_by_id(id)
.one(db)
.await?
.ok_or_else(|| loco_rs::Error::string(&format!("No such pid for id {}", id)))?;
Ok(subscriber.pid)
})
.await?;
Ok(pid)
}
pub async fn find_root(ctx: &AppContext) -> ModelResult<Self> {
Self::find_by_pid(ctx, ROOT_SUBSCRIBER).await
}
/// Asynchronously creates a user with a password and saves it to the
@ -52,7 +78,8 @@ impl Model {
/// # Errors
///
/// When could not save the user into the DB
pub async fn create_root(db: &DatabaseConnection) -> ModelResult<Self> {
pub async fn create_root(ctx: &AppContext) -> ModelResult<Self> {
let db = &ctx.db;
let txn = db.begin().await?;
let user = ActiveModel {

View File

@ -1,13 +1,33 @@
use std::{collections::HashSet, sync::Arc};
use itertools::Itertools;
use loco_rs::app::AppContext;
use sea_orm::{entity::prelude::*, ActiveValue};
use serde::{Deserialize, Serialize};
pub use super::entities::subscriptions::{self, *};
use super::{bangumi, episodes, query::filter_values_in};
use crate::{
app::AppContextExt,
extract::{
mikan::{
build_mikan_bangumi_homepage, build_mikan_bangumi_rss_link,
parse_mikan_bangumi_meta_from_mikan_homepage,
parse_mikan_episode_meta_from_mikan_homepage, parse_mikan_rss_channel_from_rss_link,
web_parser::{
parse_mikan_bangumi_poster_from_origin_poster_src_with_cache,
MikanBangumiPosterMeta,
},
},
rawname::extract_season_from_title_body,
},
models::episodes::MikanEpsiodeCreation,
};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SubscriptionCreateFromRssDto {
pub rss_link: String,
pub display_name: String,
pub aggregate: bool,
pub enabled: Option<bool>,
}
@ -37,7 +57,6 @@ impl ActiveModel {
Self {
display_name: ActiveValue::Set(create_dto.display_name),
enabled: ActiveValue::Set(create_dto.enabled.unwrap_or(false)),
aggregate: ActiveValue::Set(create_dto.aggregate),
subscriber_id: ActiveValue::Set(subscriber_id),
category: ActiveValue::Set(category),
source_url: ActiveValue::Set(create_dto.rss_link),
@ -48,20 +67,22 @@ impl ActiveModel {
impl Model {
pub async fn add_subscription(
db: &DatabaseConnection,
ctx: &AppContext,
create_dto: SubscriptionCreateDto,
subscriber_id: i32,
) -> eyre::Result<Self> {
let db = &ctx.db;
let subscription = ActiveModel::from_create_dto(create_dto, subscriber_id);
Ok(subscription.insert(db).await?)
}
pub async fn toggle_iters(
db: &DatabaseConnection,
ctx: &AppContext,
ids: impl Iterator<Item = i32>,
enabled: bool,
) -> eyre::Result<()> {
let db = &ctx.db;
Entity::update_many()
.col_expr(Column::Enabled, Expr::value(enabled))
.filter(Column::Id.is_in(ids))
@ -71,13 +92,137 @@ impl Model {
}
pub async fn delete_iters(
db: &DatabaseConnection,
ctx: &AppContext,
ids: impl Iterator<Item = i32>,
) -> eyre::Result<()> {
let db = &ctx.db;
Entity::delete_many()
.filter(Column::Id.is_in(ids))
.exec(db)
.await?;
Ok(())
}
pub async fn pull_subscription(&self, ctx: &AppContext) -> eyre::Result<()> {
match &self.category {
SubscriptionCategory::Mikan => {
let mikan_client = ctx.get_mikan_client();
let channel =
parse_mikan_rss_channel_from_rss_link(Some(mikan_client), &self.source_url)
.await?;
let items = channel.into_items();
let db = &ctx.db;
let items = items.into_iter().collect_vec();
let mut stmt = filter_values_in(
episodes::Entity,
episodes::Column::MikanEpisodeId,
items
.iter()
.map(|s| Value::from(s.mikan_episode_id.clone())),
);
stmt.expr(Expr::col(episodes::Column::SubscriptionId))
.and_where(Expr::col(episodes::Column::SubscriptionId).eq(self.id));
let builder = &db.get_database_backend();
let old_rss_item_mikan_episode_ids_set = db
.query_all(builder.build(&stmt))
.await?
.into_iter()
.flat_map(|qs| qs.try_get_by_index::<String>(0))
.collect::<HashSet<String>>();
let new_rss_items = items
.into_iter()
.filter(|item| {
!old_rss_item_mikan_episode_ids_set.contains(&item.mikan_episode_id)
})
.collect_vec();
let mut new_metas = vec![];
for new_rss_item in new_rss_items.iter() {
new_metas.push(
parse_mikan_episode_meta_from_mikan_homepage(
Some(mikan_client),
new_rss_item.homepage.clone(),
)
.await?,
);
}
let new_mikan_bangumi_groups = new_metas
.into_iter()
.into_group_map_by(|s| (s.mikan_bangumi_id.clone(), s.mikan_fansub_id.clone()));
for ((mikan_bangumi_id, mikan_fansub_id), new_ep_metas) in new_mikan_bangumi_groups
{
let mikan_base_url = ctx.get_mikan_client().base_url();
let bgm_homepage = build_mikan_bangumi_homepage(
mikan_base_url,
&mikan_bangumi_id,
Some(&mikan_fansub_id),
)?;
let bgm_rss_link = build_mikan_bangumi_rss_link(
mikan_base_url,
&mikan_bangumi_id,
Some(&mikan_fansub_id),
)?;
let bgm = Arc::new(
bangumi::Model::get_or_insert_from_mikan(
ctx,
self.id,
mikan_bangumi_id.to_string(),
mikan_fansub_id.to_string(),
async |am| -> eyre::Result<()> {
let bgm_meta = parse_mikan_bangumi_meta_from_mikan_homepage(
Some(mikan_client),
bgm_homepage.clone(),
)
.await?;
let bgm_name = bgm_meta.bangumi_title;
let (_, bgm_season_raw, bgm_season) =
extract_season_from_title_body(&bgm_name);
am.raw_name = ActiveValue::Set(bgm_name.clone());
am.display_name = ActiveValue::Set(bgm_name);
am.season = ActiveValue::Set(bgm_season);
am.season_raw = ActiveValue::Set(bgm_season_raw);
am.rss_link = ActiveValue::Set(Some(bgm_rss_link.to_string()));
am.homepage = ActiveValue::Set(Some(bgm_homepage.to_string()));
am.fansub = ActiveValue::Set(bgm_meta.fansub);
if let Some(origin_poster_src) = bgm_meta.origin_poster_src {
if let MikanBangumiPosterMeta {
poster_src: Some(poster_src),
..
} = parse_mikan_bangumi_poster_from_origin_poster_src_with_cache(
ctx,
origin_poster_src,
self.subscriber_id,
)
.await?
{
am.poster_link = ActiveValue::Set(Some(poster_src))
}
}
Ok(())
},
)
.await?,
);
episodes::Model::add_episodes(
ctx,
new_ep_metas.into_iter().map(|item| MikanEpsiodeCreation {
episode: item,
bangumi: bgm.clone(),
}),
)
.await?;
}
Ok(())
}
_ => todo!(),
}
}
}

View File

@ -1,34 +0,0 @@
use lightningcss::declaration::DeclarationBlock;
pub fn query_selector_first<'a>(
dom: &'a tl::VDom<'a>,
selector: &'a str,
parser: &'a tl::Parser<'a>,
) -> Option<&'a tl::Node<'a>> {
dom.query_selector(selector)
.and_then(|mut s| s.next())
.and_then(|n| n.get(parser))
}
pub fn query_selector_first_tag<'a>(
dom: &'a tl::VDom<'a>,
selector: &'a str,
parser: &'a tl::Parser<'a>,
) -> Option<&'a tl::HTMLTag<'a>> {
query_selector_first(dom, selector, parser).and_then(|n| n.as_tag())
}
pub fn parse_style_attr(style_attr: &str) -> Option<DeclarationBlock> {
let result = DeclarationBlock::parse_string(style_attr, Default::default()).ok()?;
Some(result)
}
pub fn get_tag_style<'a>(tag: &'a tl::HTMLTag<'a>) -> Option<DeclarationBlock<'a>> {
let style_attr = tag
.attributes()
.get("style")
.flatten()
.and_then(|s| std::str::from_utf8(s.as_bytes()).ok());
style_attr.and_then(parse_style_attr)
}

View File

@ -1,3 +0,0 @@
pub mod html_parser_utils;
pub use html_parser_utils::{get_tag_style, query_selector_first_tag};

View File

@ -1,127 +0,0 @@
use bytes::Bytes;
use html_escape::decode_html_entities;
use lazy_static::lazy_static;
use lightningcss::{properties::Property, values::image::Image};
use regex::Regex;
use url::Url;
use crate::{
downloaders::{html::download_html, image::download_image},
parsers::html::{get_tag_style, query_selector_first_tag},
};
pub struct MikanEpisodeMeta {
pub homepage: Url,
pub poster_data: Option<Bytes>,
pub origin_poster_src: Option<Url>,
pub official_title: String,
}
lazy_static! {
static ref MIKAN_TITLE_SEASON: Regex = Regex::new("第.*季").unwrap();
}
pub async fn parse_episode_meta_from_mikan_homepage(
url: Url,
) -> eyre::Result<Option<MikanEpisodeMeta>> {
let url_host = url.origin().unicode_serialization();
let content = download_html(url.as_str()).await?;
let dom = tl::parse(&content, tl::ParserOptions::default())?;
let parser = dom.parser();
let poster_node = query_selector_first_tag(&dom, r"div.bangumi-poster", parser);
let official_title_node = query_selector_first_tag(&dom, r"p.bangumi-title", parser);
let mut origin_poster_src = None;
if let Some(style) = poster_node.and_then(get_tag_style) {
for (prop, _) in style.iter() {
match prop {
Property::BackgroundImage(images) => {
if let Some(Image::Url(path)) = images.first() {
if let Ok(url) = Url::parse(&url_host).and_then(|s| s.join(path.url.trim()))
{
origin_poster_src = Some(url);
}
}
}
Property::Background(backgrounds) => {
for bg in backgrounds {
if let Image::Url(path) = &bg.image {
if let Ok(url) =
Url::parse(&url_host).and_then(|s| s.join(path.url.trim()))
{
origin_poster_src = Some(url);
}
}
}
}
_ => {}
}
}
};
origin_poster_src = origin_poster_src.map(|mut p| {
p.set_query(None);
p
});
let poster_data = if let Some(p) = origin_poster_src.as_ref() {
download_image(p.clone()).await.ok()
} else {
None
};
let meta = official_title_node
.map(|s| s.inner_text(parser))
.and_then(|official_title| {
let title = MIKAN_TITLE_SEASON
.replace(&decode_html_entities(&official_title), "")
.trim()
.to_string();
if title.is_empty() {
None
} else {
Some(title)
}
})
.map(|title| MikanEpisodeMeta {
homepage: url,
poster_data,
official_title: title,
origin_poster_src,
});
Ok(meta)
}
#[cfg(test)]
mod test {
use url::Url;
use super::parse_episode_meta_from_mikan_homepage;
#[tokio::test]
async fn test_parse_mikan() {
let test_fn = async || -> eyre::Result<()> {
let url_str =
"https://mikanani.me/Home/Episode/475184dce83ea2b82902592a5ac3343f6d54b36a";
let url = Url::parse(url_str)?;
if let Some(ep_meta) = parse_episode_meta_from_mikan_homepage(url.clone()).await? {
assert_eq!(ep_meta.homepage, url);
assert_eq!(ep_meta.official_title, "葬送的芙莉莲");
assert_eq!(
ep_meta.origin_poster_src,
Some(Url::parse(
"https://mikanani.me/images/Bangumi/202309/5ce9fed1.jpg"
)?)
);
let u8_data = ep_meta.poster_data.expect("should have poster data");
assert!(
u8_data.starts_with(&[255, 216, 255, 224]),
"should start with valid jpeg data magic number"
);
} else {
panic!("can not find mikan episode title")
}
Ok(())
};
test_fn().await.expect("test parse mikan failed");
}
}

View File

@ -1,88 +0,0 @@
use chrono::DateTime;
use reqwest::IntoUrl;
use serde::{Deserialize, Serialize};
use crate::{
downloaders::{bytes::download_bytes, defs::BITTORRENT_MIME_TYPE},
parsers::errors::ParseError,
};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MikanRssItem {
pub title: String,
pub homepage: Option<String>,
pub url: String,
pub content_length: Option<u64>,
pub mime: String,
pub pub_date: Option<i64>,
}
impl TryFrom<rss::Item> for MikanRssItem {
type Error = ParseError;
fn try_from(item: rss::Item) -> Result<Self, Self::Error> {
let mime_type = item
.enclosure()
.map(|x| x.mime_type.to_string())
.unwrap_or_default();
if mime_type == BITTORRENT_MIME_TYPE {
let enclosure = item.enclosure.unwrap();
Ok(MikanRssItem {
title: item.title.unwrap_or_default(),
homepage: item.link,
url: enclosure.url,
content_length: enclosure.length.parse().ok(),
mime: enclosure.mime_type,
pub_date: item
.pub_date
.and_then(|s| DateTime::parse_from_rfc2822(&s).ok())
.map(|s| s.timestamp_millis()),
})
} else {
Err(ParseError::MimeError {
expected: String::from(BITTORRENT_MIME_TYPE),
found: mime_type,
desc: String::from("MikanRssItem"),
})
}
}
}
pub async fn parse_mikan_rss_items_from_rss_link(
url: impl IntoUrl,
) -> eyre::Result<impl Iterator<Item = MikanRssItem>> {
let bytes = download_bytes(url).await?;
let channel = rss::Channel::read_from(&bytes[..])?;
Ok(channel.items.into_iter().flat_map(MikanRssItem::try_from))
}
#[cfg(test)]
mod tests {
use super::parse_mikan_rss_items_from_rss_link;
use crate::downloaders::defs::BITTORRENT_MIME_TYPE;
#[tokio::test]
pub async fn test_mikan_subscription_items_from_rss_url() {
let url = "https://mikanani.me/RSS/Bangumi?bangumiId=3141&subgroupid=370";
let items = parse_mikan_rss_items_from_rss_link(url)
.await
.expect("should get subscription items from rss url")
.collect::<Vec<_>>();
let first_sub_item = items
.first()
.expect("mikan subscriptions should have at least one subs");
assert_eq!(first_sub_item.mime, BITTORRENT_MIME_TYPE);
let homepage = first_sub_item
.homepage
.as_ref()
.expect("mikan subscription item should have home page");
assert!(homepage.starts_with("https://mikanani.me/Home/Episode"));
let name = first_sub_item.title.as_str();
assert!(name.contains("葬送的芙莉莲"));
}
}

View File

@ -1,5 +0,0 @@
pub mod mikan_ep_parser;
pub mod mikan_rss_parser;
pub use mikan_ep_parser::{parse_episode_meta_from_mikan_homepage, MikanEpisodeMeta};
pub use mikan_rss_parser::{parse_mikan_rss_items_from_rss_link, MikanRssItem};

View File

@ -1,3 +0,0 @@
pub mod raw_ep_parser;
pub use raw_ep_parser::{parse_episode_meta_from_raw_name, RawEpisodeMeta};

View File

@ -1 +0,0 @@
mod torrent_ep_parser;

View File

@ -1 +0,0 @@
pub mod torrent_path;

View File

@ -1,79 +0,0 @@
use std::collections::HashSet;
use quirks_path::{Path, PathBuf};
use crate::{
downloaders::defs::Torrent,
models::{bangumi, subscribers},
parsers::defs::SEASON_REGEX,
};
pub fn check_files(info: &Torrent) -> (Vec<PathBuf>, Vec<PathBuf>) {
let mut media_list = vec![];
let mut subtitle_list = vec![];
for f in info.iter_files() {
let file_name = PathBuf::from(f.get_name());
let extension = file_name.extension().unwrap_or_default().to_lowercase();
match extension.as_str() {
".mp4" | ".mkv" => {
media_list.push(file_name);
}
".ass" | ".srt" => subtitle_list.push(file_name),
_ => {}
}
}
(media_list, subtitle_list)
}
pub fn path_to_bangumi<'a>(
save_path: &'a Path,
downloader_path: &'a Path,
) -> Option<(&'a str, i32)> {
let downloader_parts = downloader_path
.components()
.map(|s| s.as_str())
.collect::<HashSet<_>>();
let mut season = None;
let mut bangumi_name = None;
for part in save_path.components().map(|s| s.as_str()) {
if let Some(match_result) = SEASON_REGEX.captures(part) {
season = Some(
match_result
.get(2)
.unwrap_or_else(|| unreachable!("must have a season"))
.as_str()
.parse::<i32>()
.unwrap_or_else(|e| unreachable!("{}", e.to_string())),
);
} else if !downloader_parts.contains(part) {
bangumi_name = Some(part);
}
}
match (season, bangumi_name) {
(Some(season), Some(bangumi_name)) => Some((bangumi_name, season)),
_ => None,
}
}
pub fn file_depth(path: &Path) -> usize {
path.components().count()
}
pub fn is_ep(path: &Path) -> bool {
file_depth(path) <= 2
}
pub fn gen_bangumi_sub_path(data: &bangumi::Model) -> PathBuf {
PathBuf::from(data.official_title.to_string()).join(format!("Season {}", data.season))
}
pub fn rule_name(bgm: &bangumi::Model, conf: &subscribers::SubscriberBangumiConfig) -> String {
if let (Some(true), Some(group_name)) = (conf.leading_group_tag, &bgm.fansub) {
format!("[{}] {} S{}", group_name, bgm.official_title, bgm.season)
} else {
format!("{} S{}", bgm.official_title, bgm.season)
}
}

View File

@ -12,19 +12,18 @@ pub struct SubscriptionWorkerArgs {
pub subscription: subscriptions::Model,
}
impl worker::AppWorker<SubscriptionWorkerArgs> for SubscriptionWorker {
#[async_trait]
impl BackgroundWorker<SubscriptionWorkerArgs> for SubscriptionWorker {
fn build(ctx: &AppContext) -> Self {
Self { ctx: ctx.clone() }
}
}
#[async_trait]
impl worker::Worker<SubscriptionWorkerArgs> for SubscriptionWorker {
async fn perform(&self, args: SubscriptionWorkerArgs) -> worker::Result<()> {
async fn perform(&self, _args: SubscriptionWorkerArgs) -> Result<()> {
println!("================================================");
let db = &self.ctx.db;
let storage = &self.ctx.storage;
let _db = &self.ctx.db;
let _storage = &self.ctx.storage;
println!("================================================");
Ok(())

17
crates/torrent/.gitignore vendored Normal file
View File

@ -0,0 +1,17 @@
**/config/local.yaml
**/config/*.local.yaml
# Generated by Cargo
# will have compiled files and executables
debug/
target/
# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Cargo.lock
# These are backup files generated by rustfmt
**/*.rs.bk
# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb

38
crates/torrent/Cargo.toml Normal file
View File

@ -0,0 +1,38 @@
[package]
name = "torrent"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
name = "torrent"
path = "src/lib.rs"
[features]
default = []
testcontainers = []
[dependencies]
async-trait = "0.1.83"
chrono = "0.4.39"
eyre = "0.6.12"
futures = "0.3.31"
itertools = "0.13.0"
lazy_static = "1.5.0"
librqbit-core = "4"
qbit-rs = { git = "https://github.com/lonelyhentxi/qbit.git", rev = "a2c70aa", features = [
"default",
"builder",
] }
regex = "1.11.1"
serde = "1.0.216"
thiserror = "2.0.9"
tokio = "1.42.0"
url = "2.5.4"
quirks_path = { path = "../quirks-path" }
reqwest = "0.12.11"
bytes = "1.9.0"
[dev-dependencies]
testcontainers = { version = "0.23.1" }
testcontainers-modules = { version = "0.11.4" }

View File

@ -1,22 +1,29 @@
use bytes::Bytes;
use itertools::Itertools;
use lazy_static::lazy_static;
use librqbit_core::{
magnet::Magnet,
torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Owned},
};
pub use qbit_rs::model::{
Torrent as QbitTorrent, TorrentContent as QbitTorrentContent,
TorrentFilter as QbitTorrentFilter, TorrentSource as QbitTorrentSource,
};
use quirks_path::{Path, PathBuf};
use regex::Regex;
use reqwest::IntoUrl;
use serde::{Deserialize, Serialize};
use url::Url;
use crate::downloaders::{bytes::download_bytes, error::DownloaderError};
use crate::{QbitTorrent, QbitTorrentContent, TorrentDownloadError};
pub const BITTORRENT_MIME_TYPE: &str = "application/x-bittorrent";
pub const MAGNET_SCHEMA: &str = "magnet";
pub const DEFAULT_USER_AGENT: &str = "Wget/1.13.4 (linux-gnu)";
pub const DEFAULT_TORRENT_USER_AGENT: &str = "Wget/1.13.4 (linux-gnu)";
async fn download_torrent_file<T: IntoUrl>(url: T) -> eyre::Result<Bytes> {
let request_client = reqwest::Client::builder()
.user_agent(DEFAULT_TORRENT_USER_AGENT)
.build()?;
let bytes = request_client.get(url).send().await?.bytes().await?;
Ok(bytes)
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
@ -34,24 +41,6 @@ pub enum TorrentFilter {
Errored,
}
impl From<TorrentFilter> for QbitTorrentFilter {
fn from(val: TorrentFilter) -> Self {
match val {
TorrentFilter::All => QbitTorrentFilter::All,
TorrentFilter::Downloading => QbitTorrentFilter::Downloading,
TorrentFilter::Completed => QbitTorrentFilter::Completed,
TorrentFilter::Paused => QbitTorrentFilter::Paused,
TorrentFilter::Active => QbitTorrentFilter::Active,
TorrentFilter::Inactive => QbitTorrentFilter::Inactive,
TorrentFilter::Resumed => QbitTorrentFilter::Resumed,
TorrentFilter::Stalled => QbitTorrentFilter::Stalled,
TorrentFilter::StalledUploading => QbitTorrentFilter::StalledUploading,
TorrentFilter::StalledDownloading => QbitTorrentFilter::StalledDownloading,
TorrentFilter::Errored => QbitTorrentFilter::Errored,
}
}
}
lazy_static! {
static ref TORRENT_HASH_RE: Regex = Regex::new(r"[a-fA-F0-9]{40}").unwrap();
static ref TORRENT_EXT_RE: Regex = Regex::new(r"\.torrent$").unwrap();
@ -90,19 +79,19 @@ impl TorrentSource {
) {
TorrentSource::from_torrent_url(url, match_hash.as_str().to_string())?
} else {
let contents = download_bytes(url).await?;
let contents = download_torrent_file(url).await?;
TorrentSource::from_torrent_file(contents.to_vec(), Some(basename.to_string()))?
}
} else {
let contents = download_bytes(url).await?;
let contents = download_torrent_file(url).await?;
TorrentSource::from_torrent_file(contents.to_vec(), None)?
};
Ok(source)
}
pub fn from_torrent_file(file: Vec<u8>, name: Option<String>) -> eyre::Result<Self> {
let torrent: TorrentMetaV1Owned =
torrent_from_bytes(&file).map_err(|_| DownloaderError::InvalidTorrentFileFormat)?;
let torrent: TorrentMetaV1Owned = torrent_from_bytes(&file)
.map_err(|_| TorrentDownloadError::InvalidTorrentFileFormat)?;
let hash = torrent.info_hash.as_string();
Ok(TorrentSource::TorrentFile {
torrent: file,
@ -113,17 +102,24 @@ impl TorrentSource {
pub fn from_magnet_url(url: Url) -> eyre::Result<Self> {
if url.scheme() != MAGNET_SCHEMA {
Err(DownloaderError::InvalidUrlSchema {
Err(TorrentDownloadError::InvalidUrlSchema {
found: url.scheme().to_string(),
expected: MAGNET_SCHEMA.to_string(),
}
.into())
} else {
let magnet =
Magnet::parse(url.as_str()).map_err(|_| DownloaderError::InvalidMagnetFormat {
let magnet = Magnet::parse(url.as_str()).map_err(|_| {
TorrentDownloadError::InvalidMagnetFormat {
url: url.as_str().to_string(),
})?;
let hash = magnet.info_hash.as_string();
}
})?;
let hash = magnet
.as_id20()
.ok_or_else(|| TorrentDownloadError::InvalidMagnetFormat {
url: url.as_str().to_string(),
})?
.as_string();
Ok(TorrentSource::MagnetUrl { url, hash })
}
}
@ -141,22 +137,6 @@ impl TorrentSource {
}
}
impl From<TorrentSource> for QbitTorrentSource {
fn from(value: TorrentSource) -> Self {
match value {
TorrentSource::MagnetUrl { url, .. } => QbitTorrentSource::Urls {
urls: qbit_rs::model::Sep::from([url]),
},
TorrentSource::TorrentUrl { url, .. } => QbitTorrentSource::Urls {
urls: qbit_rs::model::Sep::from([url]),
},
TorrentSource::TorrentFile {
torrent: torrents, ..
} => QbitTorrentSource::TorrentFiles { torrents },
}
}
}
pub trait TorrentContent {
fn get_name(&self) -> &str;
@ -247,3 +227,43 @@ impl Torrent {
}
}
}
#[async_trait::async_trait]
pub trait TorrentDownloader {
async fn get_torrents_info(
&self,
status_filter: TorrentFilter,
category: Option<String>,
tag: Option<String>,
) -> eyre::Result<Vec<Torrent>>;
async fn add_torrents(
&self,
source: TorrentSource,
save_path: String,
category: Option<&str>,
) -> eyre::Result<()>;
async fn delete_torrents(&self, hashes: Vec<String>) -> eyre::Result<()>;
async fn rename_torrent_file(
&self,
hash: &str,
old_path: &str,
new_path: &str,
) -> eyre::Result<()>;
async fn move_torrents(&self, hashes: Vec<String>, new_path: &str) -> eyre::Result<()>;
async fn get_torrent_path(&self, hashes: String) -> eyre::Result<Option<String>>;
async fn check_connection(&self) -> eyre::Result<()>;
async fn set_torrents_category(&self, hashes: Vec<String>, category: &str) -> eyre::Result<()>;
async fn add_torrent_tags(&self, hashes: Vec<String>, tags: Vec<String>) -> eyre::Result<()>;
async fn add_category(&self, category: &str) -> eyre::Result<()>;
fn get_save_path(&self, sub_path: &Path) -> PathBuf;
}

View File

@ -1,9 +1,9 @@
use std::borrow::Cow;
use std::{borrow::Cow, time::Duration};
use thiserror::Error;
use std::time::Duration;
#[derive(Error, Debug)]
pub enum DownloaderError {
pub enum TorrentDownloadError {
#[error("Invalid mime (expected {expected:?}, got {found:?})")]
InvalidMime { expected: String, found: String },
#[error("Invalid url schema (expected {expected:?}, got {found:?})")]

11
crates/torrent/src/lib.rs Normal file
View File

@ -0,0 +1,11 @@
pub mod core;
pub mod error;
pub mod qbit;
pub use core::{Torrent, TorrentContent, TorrentDownloader, TorrentFilter, TorrentSource};
pub use error::TorrentDownloadError;
pub use qbit::{
QBittorrentDownloader, QBittorrentDownloaderCreation, QbitTorrent, QbitTorrentContent,
QbitTorrentFile, QbitTorrentFilter, QbitTorrentSource,
};

View File

@ -1,14 +1,13 @@
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
fmt::Debug,
future::Future,
sync::Arc,
time::Duration,
borrow::Cow, collections::HashSet, fmt::Debug, future::Future, sync::Arc, time::Duration,
};
use eyre::OptionExt;
use futures::future::try_join_all;
pub use qbit_rs::model::{
Torrent as QbitTorrent, TorrentContent as QbitTorrentContent, TorrentFile as QbitTorrentFile,
TorrentFilter as QbitTorrentFilter, TorrentSource as QbitTorrentSource,
};
use qbit_rs::{
model::{AddTorrentArg, Credential, GetTorrentListArg, NonEmptyStr, SyncData},
Qbit,
@ -17,18 +16,55 @@ use quirks_path::{path_equals_as_file_url, Path, PathBuf};
use tokio::time::sleep;
use url::Url;
use super::{
defs::{Torrent, TorrentFilter, TorrentSource},
error::DownloaderError,
torrent_downloader::TorrentDownloader,
};
use crate::{
downloaders::defs::{QbitTorrent, QbitTorrentContent, TorrentContent},
models::{entities::downloaders, prelude::DownloaderCategory},
};
use crate::{Torrent, TorrentDownloadError, TorrentDownloader, TorrentFilter, TorrentSource};
pub struct SyncDataCache {
pub torrents: HashMap<String, QbitTorrent>,
impl From<TorrentSource> for QbitTorrentSource {
fn from(value: TorrentSource) -> Self {
match value {
TorrentSource::MagnetUrl { url, .. } => QbitTorrentSource::Urls {
urls: qbit_rs::model::Sep::from([url]),
},
TorrentSource::TorrentUrl { url, .. } => QbitTorrentSource::Urls {
urls: qbit_rs::model::Sep::from([url]),
},
TorrentSource::TorrentFile {
torrent: torrents,
name,
..
} => QbitTorrentSource::TorrentFiles {
torrents: vec![QbitTorrentFile {
filename: name.unwrap_or_default(),
data: torrents,
}],
},
}
}
}
impl From<TorrentFilter> for QbitTorrentFilter {
fn from(val: TorrentFilter) -> Self {
match val {
TorrentFilter::All => QbitTorrentFilter::All,
TorrentFilter::Downloading => QbitTorrentFilter::Downloading,
TorrentFilter::Completed => QbitTorrentFilter::Completed,
TorrentFilter::Paused => QbitTorrentFilter::Paused,
TorrentFilter::Active => QbitTorrentFilter::Active,
TorrentFilter::Inactive => QbitTorrentFilter::Inactive,
TorrentFilter::Resumed => QbitTorrentFilter::Resumed,
TorrentFilter::Stalled => QbitTorrentFilter::Stalled,
TorrentFilter::StalledUploading => QbitTorrentFilter::StalledUploading,
TorrentFilter::StalledDownloading => QbitTorrentFilter::StalledDownloading,
TorrentFilter::Errored => QbitTorrentFilter::Errored,
}
}
}
pub struct QBittorrentDownloaderCreation {
pub endpoint: String,
pub username: String,
pub password: String,
pub save_path: String,
pub subscriber_id: i32,
}
pub struct QBittorrentDownloader {
@ -40,32 +76,26 @@ pub struct QBittorrentDownloader {
}
impl QBittorrentDownloader {
pub async fn from_downloader_model(model: downloaders::Model) -> Result<Self, DownloaderError> {
if model.category != DownloaderCategory::QBittorrent {
return Err(DownloaderError::InvalidMime {
expected: DownloaderCategory::QBittorrent.to_string(),
found: model.category.to_string(),
});
}
let endpoint_url = model
.endpoint_url()
.map_err(DownloaderError::InvalidUrlParse)?;
let credential = Credential::new(model.username, model.password);
pub async fn from_creation(
creation: QBittorrentDownloaderCreation,
) -> Result<Self, TorrentDownloadError> {
let endpoint_url =
Url::parse(&creation.endpoint).map_err(TorrentDownloadError::InvalidUrlParse)?;
let credential = Credential::new(creation.username, creation.password);
let client = Qbit::new(endpoint_url.clone(), credential);
client
.login(false)
.await
.map_err(DownloaderError::QBitAPIError)?;
.map_err(TorrentDownloadError::QBitAPIError)?;
client.sync(None).await?.rid;
Ok(Self {
client: Arc::new(client),
endpoint_url,
subscriber_id: model.subscriber_id,
save_path: model.save_path.into(),
subscriber_id: creation.subscriber_id,
save_path: creation.save_path.into(),
wait_sync_timeout: Duration::from_millis(10000),
})
}
@ -102,7 +132,7 @@ impl QBittorrentDownloader {
if stop_wait_fn(sync_data) {
break;
} else {
return Err(DownloaderError::TimeoutError {
return Err(TorrentDownloadError::TimeoutError {
action: Cow::Borrowed("QBittorrentDownloader::wait_unit"),
timeout,
}
@ -274,8 +304,7 @@ impl TorrentDownloader for QBittorrentDownloader {
hash,
|contents| -> bool {
contents.iter().any(|c| {
path_equals_as_file_url(save_path.join(c.get_name()), &new_path)
.unwrap_or(false)
path_equals_as_file_url(save_path.join(&c.name), &new_path).unwrap_or(false)
})
},
None,
@ -467,11 +496,7 @@ pub mod tests {
async fn test_qbittorrent_downloader_impl() {
let base_save_path = Path::new(get_tmp_qbit_test_folder());
let downloader = QBittorrentDownloader::from_downloader_model(downloaders::Model {
created_at: Default::default(),
updated_at: Default::default(),
id: 0,
category: DownloaderCategory::QBittorrent,
let downloader = QBittorrentDownloader::from_creation(QBittorrentDownloaderCreation {
endpoint: "http://localhost:8080".to_string(),
password: "".to_string(),
username: "".to_string(),
@ -489,8 +514,8 @@ pub mod tests {
.unwrap();
let torrent_source = TorrentSource::parse(
"https://mikanani.me/Download/20240301/47ee2d69e7f19af783ad896541a07b012676f858.torrent"
).await.unwrap();
"https://mikanani.me/Download/20240301/47ee2d69e7f19af783ad896541a07b012676f858.torrent"
).await.unwrap();
let save_path = base_save_path.join(format!(
"test_add_torrents_{}",

View File

@ -4,10 +4,14 @@ set dotenv-load
prepare-dev-recorder:
cargo install loco-cli
cargo install sea-orm-cli
cargo install cargo-watch
dev-recorder:
cargo watch -w crates/recorder -w config -x 'recorder start'
down-recorder:
cargo run -p recorder --bin recorder_cli -- db down 999 --environment recorder.development
play-recorder:
cargo recorder-playground