diff --git a/Cargo.lock b/Cargo.lock index 6df39b5..cc794ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -547,9 +547,9 @@ checksum = "0d75b8252ed252f881d1dc4482ae3c3854df6ee8183c1906bac50ff358f4f89f" [[package]] name = "bumpalo" -version = "3.15.2" +version = "3.15.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3b1be7772ee4501dba05acbe66bb1e8760f6a6c474a36035631638e4415f130" +checksum = "8ea184aa71bb362a1157c896979544cc23974e08fd265f29ea96b59f0b4a555b" [[package]] name = "byte-unit" @@ -660,7 +660,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.52.1", + "windows-targets 0.52.3", ] [[package]] @@ -2808,8 +2808,10 @@ version = "0.1.0" dependencies = [ "async-trait", "axum", + "bytes", "chrono", "eyre", + "futures", "include_dir", "insta", "loco-rs", @@ -3835,12 +3837,12 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.5" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" +checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" dependencies = [ "libc", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -4462,9 +4464,9 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0da193277a4e2c33e59e09b5861580c33dd0a637c3883d0fa74ba40c0374af2e" +checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" dependencies = [ "async-compression", "bitflags 2.4.2", @@ -4966,7 +4968,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.52.1", + "windows-targets 0.52.3", ] [[package]] @@ -4984,7 +4986,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.1", + "windows-targets 0.52.3", ] [[package]] @@ -5004,17 +5006,17 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.52.1" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56eb995bed789027c5fa9be885994e944989ce9f1b02956b4bf8744c3dbf449b" +checksum = "d380ba1dc7187569a8a9e91ed34b8ccfc33123bbacb8c0aed2d1ad7f3ef2dc5f" dependencies = [ - "windows_aarch64_gnullvm 0.52.1", - "windows_aarch64_msvc 0.52.1", - "windows_i686_gnu 0.52.1", - "windows_i686_msvc 0.52.1", - "windows_x86_64_gnu 0.52.1", - "windows_x86_64_gnullvm 0.52.1", - "windows_x86_64_msvc 0.52.1", + "windows_aarch64_gnullvm 0.52.3", + "windows_aarch64_msvc 0.52.3", + "windows_i686_gnu 0.52.3", + "windows_i686_msvc 0.52.3", + "windows_x86_64_gnu 0.52.3", + "windows_x86_64_gnullvm 0.52.3", + "windows_x86_64_msvc 0.52.3", ] [[package]] @@ -5025,9 +5027,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_gnullvm" -version = "0.52.1" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7269c1442e75af9fa59290383f7665b828efc76c429cc0b7f2ecb33cf51ebae" +checksum = "68e5dcfb9413f53afd9c8f86e56a7b4d86d9a2fa26090ea2dc9e40fba56c6ec6" [[package]] name = "windows_aarch64_msvc" @@ -5037,9 +5039,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_aarch64_msvc" -version = "0.52.1" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f70ab2cebf332b7ecbdd98900c2da5298a8c862472fb35c75fc297eabb9d89b8" +checksum = "8dab469ebbc45798319e69eebf92308e541ce46760b49b18c6b3fe5e8965b30f" [[package]] name = "windows_i686_gnu" @@ -5049,9 +5051,9 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_gnu" -version = "0.52.1" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "679f235acf6b1639408c0f6db295697a19d103b0cdc88146aa1b992c580c647d" +checksum = "2a4e9b6a7cac734a8b4138a4e1044eac3404d8326b6c0f939276560687a033fb" [[package]] name = "windows_i686_msvc" @@ -5061,9 +5063,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_i686_msvc" -version = "0.52.1" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3480ac194b55ae274a7e135c21645656825da4a7f5b6e9286291b2113c94a78b" +checksum = "28b0ec9c422ca95ff34a78755cfa6ad4a51371da2a5ace67500cf7ca5f232c58" [[package]] name = "windows_x86_64_gnu" @@ -5073,9 +5075,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnu" -version = "0.52.1" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42c46bab241c121402d1cb47d028ea3680ee2f359dcc287482dcf7fdddc73363" +checksum = "704131571ba93e89d7cd43482277d6632589b18ecf4468f591fbae0a8b101614" [[package]] name = "windows_x86_64_gnullvm" @@ -5085,9 +5087,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_gnullvm" -version = "0.52.1" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc885a4332ee1afb9a1bacf11514801011725570d35675abc229ce7e3afe4d20" +checksum = "42079295511643151e98d61c38c0acc444e52dd42ab456f7ccfd5152e8ecf21c" [[package]] name = "windows_x86_64_msvc" @@ -5097,9 +5099,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "windows_x86_64_msvc" -version = "0.52.1" +version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e440c60457f84b0bee09208e62acc7ade264b38c4453f6312b8c9ab1613e73c" +checksum = "0770833d60a970638e989b3fa9fd2bb1aaadcf88963d1659fd7d9990196ed2d6" [[package]] name = "winnow" diff --git a/config/recorder.development.yaml b/config/recorder.development.yaml index 7099861..204a422 100644 --- a/config/recorder.development.yaml +++ b/config/recorder.development.yaml @@ -86,7 +86,7 @@ mailer: # Database Configuration database: # Database connection URI - uri: '{{ get_env(name="DATABASE_URL", default="postgres://konobangu:konobangu@localhost:5432/konobangu") }}' + uri: '{{ get_env(name="DATABASE_URL", default="postgres://konobangu:konobangu@127.0.0.1:5432/konobangu") }}' # When enabled, the sql query will be logged. enable_logging: false # Set the timeout duration when acquiring a connection. @@ -107,7 +107,7 @@ database: # Redis Configuration redis: # Redis connection URI - uri: '{{ get_env(name="REDIS_URL", default="redis://127.0.0.1") }}' + uri: '{{ get_env(name="REDIS_URL", default="redis://127.0.0.1:6379") }}' # Dangerously flush all data in Redis on startup. dangerous operation, make sure that you using this flag only on dev environments or test mode dangerously_flush: false diff --git a/crates/recorder/Cargo.toml b/crates/recorder/Cargo.toml index 73a7d44..58a19bd 100644 --- a/crates/recorder/Cargo.toml +++ b/crates/recorder/Cargo.toml @@ -33,6 +33,8 @@ sea-orm-migration = { version = "1.0.0-rc.1", features = [ reqwest = "0.11.24" thiserror = "1.0.57" rss = "2.0.7" +bytes = "1.5.0" +futures = "0.3.30" [lib] name = "recorder" diff --git a/crates/recorder/examples/playground.rs b/crates/recorder/examples/playground.rs index 806942d..0649c58 100644 --- a/crates/recorder/examples/playground.rs +++ b/crates/recorder/examples/playground.rs @@ -1,7 +1,6 @@ use eyre::Context; #[allow(unused_imports)] use loco_rs::{cli::playground, prelude::*}; -use recorder::app::App; async fn fetch_and_parse_rss_demo () -> eyre::Result<()> { let url = @@ -15,7 +14,6 @@ async fn fetch_and_parse_rss_demo () -> eyre::Result<()> { #[tokio::main] async fn main() -> eyre::Result<()> { - fetch_and_parse_rss_demo().await?; // let active_model: articles::ActiveModel = ActiveModel { diff --git a/crates/recorder/src/app.rs b/crates/recorder/src/app.rs index f227de5..84ea117 100644 --- a/crates/recorder/src/app.rs +++ b/crates/recorder/src/app.rs @@ -19,6 +19,7 @@ use crate::{ }; pub struct App; + #[async_trait] impl Hooks for App { fn app_name() -> &'static str { diff --git a/crates/recorder/src/downloader/bytes.rs b/crates/recorder/src/downloader/bytes.rs new file mode 100644 index 0000000..fc0800d --- /dev/null +++ b/crates/recorder/src/downloader/bytes.rs @@ -0,0 +1,6 @@ +use bytes::Bytes; + +pub async fn download_bytes (url: &str) -> eyre::Result { + let bytes = reqwest::get(url).await?.bytes().await?; + Ok(bytes) +} \ No newline at end of file diff --git a/crates/recorder/src/downloader/defs.rs b/crates/recorder/src/downloader/defs.rs new file mode 100644 index 0000000..a1ba34b --- /dev/null +++ b/crates/recorder/src/downloader/defs.rs @@ -0,0 +1 @@ +pub const BITTORRENT_MIME_TYPE: &str = "application/x-bittorrent"; \ No newline at end of file diff --git a/crates/recorder/src/downloader/mod.rs b/crates/recorder/src/downloader/mod.rs index 9bb2c04..d851447 100644 --- a/crates/recorder/src/downloader/mod.rs +++ b/crates/recorder/src/downloader/mod.rs @@ -1,2 +1,4 @@ pub mod aria; pub mod qbitorrent; +pub mod defs; +pub mod bytes; diff --git a/crates/recorder/src/lib.rs b/crates/recorder/src/lib.rs index a55e281..8d0f1d4 100644 --- a/crates/recorder/src/lib.rs +++ b/crates/recorder/src/lib.rs @@ -3,7 +3,6 @@ pub mod controllers; pub mod downloader; pub mod migrations; pub mod models; -pub mod rss; pub mod subscriptions; pub mod tasks; pub mod views; diff --git a/crates/recorder/src/migrations/defs.rs b/crates/recorder/src/migrations/defs.rs index 94ad6b2..aa2c999 100644 --- a/crates/recorder/src/migrations/defs.rs +++ b/crates/recorder/src/migrations/defs.rs @@ -1,6 +1,19 @@ -use sea_orm_migration::prelude::*; +use std::{collections::HashSet}; +use std::fmt::Display; -#[derive(Iden)] +use sea_orm::{DeriveIden, Statement}; +use sea_orm_migration::prelude::*; +use sea_orm_migration::prelude::extension::postgres::IntoTypeRef; + +use crate::migrations::extension::postgres::Type; + +#[derive(DeriveIden)] +pub enum GeneralIds { + CreatedAt, + UpdatedAt, +} + +#[derive(DeriveIden)] pub enum Subscribers { Table, Id, @@ -8,7 +21,7 @@ pub enum Subscribers { DisplayName, } -#[derive(Iden)] +#[derive(DeriveIden)] pub enum Subscriptions { Table, Id, @@ -20,7 +33,7 @@ pub enum Subscriptions { Enabled, } -#[derive(Iden)] +#[derive(DeriveIden)] pub enum Bangumi { Table, Id, @@ -28,13 +41,290 @@ pub enum Bangumi { SubscriptionId, } -#[derive(Iden)] +#[derive(DeriveIden)] pub enum Episodes { Table, Id, DisplayName, BangumiId, - DownloadUrl, - DownloadProgress, OutputName, + DownloadId, +} + +#[derive(DeriveIden)] +pub enum Downloads { + Table, + Id, + SubscriptionId, + OriginalName, + DisplayName, + Status, + CurrSize, + AllSize, + Mime, +} + +#[async_trait::async_trait] +pub trait CustomSchemaManagerExt { + async fn create_postgres_auto_update_ts_fn(&self, col_name: &str) -> Result<(), DbErr>; + async fn create_postgres_auto_update_ts_fn_for_col( + &self, + col: C, + ) -> Result<(), DbErr> { + let column_ident = col.into_iden(); + self.create_postgres_auto_update_ts_fn(&column_ident.to_string()) + .await?; + Ok(()) + } + + async fn create_postgres_auto_update_ts_trigger( + &self, + tab_name: &str, + col_name: &str, + ) -> Result<(), DbErr>; + + async fn create_postgres_auto_update_ts_trigger_for_col< + T: IntoIden + 'static + Send, + C: IntoIden + 'static + Send, + >( + &self, + tab: T, + col: C, + ) -> Result<(), DbErr> { + let column_ident = col.into_iden(); + let table_ident = tab.into_iden(); + self.create_postgres_auto_update_ts_trigger( + &table_ident.to_string(), + &column_ident.to_string(), + ) + .await?; + Ok(()) + } + + async fn drop_postgres_auto_update_ts_fn(&self, col_name: &str) -> Result<(), DbErr>; + + async fn drop_postgres_auto_update_ts_fn_for_col( + &self, + col: C, + ) -> Result<(), DbErr> { + let column_ident = col.into_iden(); + self.drop_postgres_auto_update_ts_fn(&column_ident.to_string()) + .await?; + Ok(()) + } + + async fn drop_postgres_auto_update_ts_trigger( + &self, + tab_name: &str, + col_name: &str, + ) -> Result<(), DbErr>; + + async fn drop_postgres_auto_update_ts_trigger_for_col< + T: IntoIden + 'static + Send, + C: IntoIden + 'static + Send, + >( + &self, + tab: T, + col: C, + ) -> Result<(), DbErr> { + let column_ident = col.into_iden(); + let table_ident = tab.into_iden(); + self.drop_postgres_auto_update_ts_trigger( + &table_ident.to_string(), + &column_ident.to_string(), + ) + .await?; + Ok(()) + } + + async fn create_postgres_enum_for_active_enum< + E: IntoTypeRef + IntoIden + Send + Clone, + T: Display + Send, + I: IntoIterator + Send, + >( + &self, + enum_name: E, + values: I, + ) -> Result<(), DbErr>; + + async fn add_postgres_enum_values_for_active_enum< + E: IntoTypeRef + IntoIden + Send + Clone, + T: Display + Send, + I: IntoIterator + Send, + >( + &self, + enum_name: E, + values: I, + ) -> Result<(), DbErr>; + + async fn drop_postgres_enum_for_active_enum( + &self, + enum_name: E, + ) -> Result<(), DbErr>; + + async fn if_postgres_enum_exists( + &self, + enum_name: E, + ) -> Result; + + async fn get_postgres_enum_values( + &self, + enum_name: E, + ) -> Result, DbErr>; +} + +#[async_trait::async_trait] +impl<'c> CustomSchemaManagerExt for SchemaManager<'c> { + async fn create_postgres_auto_update_ts_fn(&self, col_name: &str) -> Result<(), DbErr> { + let sql = format!( + "CREATE OR REPLACE FUNCTION update_{col_name}_column() RETURNS TRIGGER AS $$ BEGIN \ + NEW.{col_name} = current_timestamp; RETURN NEW; END; $$ language 'plpgsql';" + ); + + self.get_connection() + .execute(Statement::from_string(self.get_database_backend(), sql)) + .await?; + + Ok(()) + } + + async fn create_postgres_auto_update_ts_trigger( + &self, + tab_name: &str, + col_name: &str, + ) -> Result<(), DbErr> { + let sql = format!( + "CREATE OR REPLACE TRIGGER update_{tab_name}_{col_name}_column_trigger BEFORE UPDATE \ + ON {tab_name} FOR EACH ROW EXECUTE PROCEDURE update_{col_name}_column();" + ); + self.get_connection() + .execute(Statement::from_string(self.get_database_backend(), sql)) + .await?; + Ok(()) + } + + async fn drop_postgres_auto_update_ts_fn(&self, col_name: &str) -> Result<(), DbErr> { + let sql = format!("DROP FUNCTION IF EXISTS update_{col_name}_column();"); + self.get_connection() + .execute(Statement::from_string(self.get_database_backend(), sql)) + .await?; + Ok(()) + } + + async fn drop_postgres_auto_update_ts_trigger( + &self, + tab_name: &str, + col_name: &str, + ) -> Result<(), DbErr> { + let sql = format!( + "DROP TRIGGER IF EXISTS update_{tab_name}_{col_name}_column_trigger ON {tab_name};" + ); + self.get_connection() + .execute(Statement::from_string(self.get_database_backend(), sql)) + .await?; + Ok(()) + } + + async fn create_postgres_enum_for_active_enum< + E: IntoTypeRef + IntoIden + Send + Clone, + T: Display + Send, + I: IntoIterator + Send, + >( + &self, + enum_name: E, + values: I, + ) -> Result<(), DbErr> { + let existed = self.if_postgres_enum_exists(enum_name.clone()).await?; + if !existed { + let idents = values + .into_iter() + .map(|v| Alias::new(v.to_string())) + .collect::>(); + self.create_type( + Type::create() + .as_enum(enum_name) + .values(idents) + .to_owned(), + ) + .await?; + } else { + self.add_postgres_enum_values_for_active_enum(enum_name, values) + .await?; + } + Ok(()) + } + + async fn add_postgres_enum_values_for_active_enum< + E: IntoTypeRef + IntoIden + Send + Clone, + T: Display + Send, + I: IntoIterator + Send, + >( + &self, + enum_name: E, + values: I, + ) -> Result<(), DbErr> { + let exists_values = self.get_postgres_enum_values(enum_name.clone()).await?; + let to_add_values = values + .into_iter() + .filter(|v| !exists_values.contains(&v.to_string())) + .collect::>(); + + if to_add_values.is_empty() { + return Ok(()); + } + + let mut type_alter = Type::alter().name(enum_name); + + for v in to_add_values { + type_alter = type_alter.add_value(Alias::new(v.to_string())); + } + + self.alter_type(type_alter.to_owned()).await?; + Ok(()) + } + + async fn drop_postgres_enum_for_active_enum( + &self, + enum_name: E, + ) -> Result<(), DbErr> { + self.drop_type(Type::drop().name(enum_name).to_owned()) + .await?; + Ok(()) + } + + async fn if_postgres_enum_exists( + &self, + enum_name: E, + ) -> Result { + let enum_name: String = enum_name.into_iden().to_string(); + let sql = format!("SELECT 1 FROM pg_type WHERE typname = '{enum_name}'"); + let result = self + .get_connection() + .query_one(Statement::from_string(self.get_database_backend(), sql)) + .await?; + Ok(result.is_some()) + } + + async fn get_postgres_enum_values( + &self, + enum_name: E, + ) -> Result, DbErr> { + let enum_name: String = enum_name.into_iden().to_string(); + let sql = format!( + "SELECT pg_enum.enumlabel AS enumlabel FROM pg_type JOIN pg_enum ON pg_enum.enumtypid \ + = pg_type.oid WHERE pg_type.typname = '{enum_name}';" + ); + + let results = self + .get_connection() + .query_all(Statement::from_string(self.get_database_backend(), sql)) + .await?; + + let mut items = HashSet::new(); + for r in results { + items.insert(r.try_get::("", "enumlabel")?); + } + + Ok(items) + } } diff --git a/crates/recorder/src/migrations/m20220101_000001_init.rs b/crates/recorder/src/migrations/m20220101_000001_init.rs index 3fac885..dc23580 100644 --- a/crates/recorder/src/migrations/m20220101_000001_init.rs +++ b/crates/recorder/src/migrations/m20220101_000001_init.rs @@ -1,8 +1,9 @@ -use sea_orm::sea_query::extension::postgres::Type; use sea_orm_migration::{prelude::*, schema::*}; -use super::defs::{Bangumi, Episodes, Subscribers, Subscriptions}; -use crate::models::subscribers::ROOT_SUBSCRIBER; +use super::defs::{ + Bangumi, CustomSchemaManagerExt, Episodes, GeneralIds, Subscribers, Subscriptions, +}; +use crate::models::{subscribers::ROOT_SUBSCRIBER, subscriptions}; #[derive(DeriveMigrationName)] pub struct Migration; @@ -10,6 +11,9 @@ pub struct Migration; #[async_trait::async_trait] impl MigrationTrait for Migration { async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_postgres_auto_update_ts_fn_for_col(GeneralIds::UpdatedAt) + .await?; manager .create_table( table_auto(Subscribers::Table) @@ -19,6 +23,12 @@ impl MigrationTrait for Migration { .to_owned(), ) .await?; + manager + .create_postgres_auto_update_ts_trigger_for_col( + Subscribers::Table, + GeneralIds::UpdatedAt, + ) + .await?; let insert = Query::insert() .into_table(Subscribers::Table) @@ -28,15 +38,13 @@ impl MigrationTrait for Migration { manager.exec_stmt(insert).await?; manager - .create_type( - Type::create() - .as_enum(Alias::new("subscription_category")) - .values([ - Alias::new("mikan"), - Alias::new("manual"), - Alias::new("bangumi"), - ]) - .to_owned(), + .create_postgres_enum_for_active_enum( + subscriptions::SubscriptionCategoryEnum, + &[ + subscriptions::SubscriptionCategory::Mikan, + subscriptions::SubscriptionCategory::Manual, + subscriptions::SubscriptionCategory::Bangumi, + ], ) .await?; @@ -49,9 +57,14 @@ impl MigrationTrait for Migration { .col(text(Subscriptions::SourceUrl)) .col(boolean(Subscriptions::Aggregate)) .col(boolean(Subscriptions::Enabled)) + .col(enumeration( + Subscriptions::Category, + subscriptions::SubscriptionCategoryEnum, + subscriptions::SubscriptionCategory::iden_values(), + )) .foreign_key( ForeignKey::create() - .name("subscription_subscriber_id") + .name("fk_subscription_subscriber_id") .from(Subscriptions::Table, Subscriptions::SubscriberId) .to(Subscribers::Table, Subscribers::Id), ) @@ -59,6 +72,13 @@ impl MigrationTrait for Migration { ) .await?; + manager + .create_postgres_auto_update_ts_trigger_for_col( + Subscriptions::Table, + GeneralIds::UpdatedAt, + ) + .await?; + manager .create_table( table_auto(Bangumi::Table) @@ -67,7 +87,7 @@ impl MigrationTrait for Migration { .col(integer(Bangumi::SubscriptionId)) .foreign_key( ForeignKey::create() - .name("bangumi_subscription_id") + .name("fk_bangumi_subscription_id") .from(Bangumi::Table, Bangumi::SubscriptionId) .to(Subscriptions::Table, Subscriptions::Id), ) @@ -75,18 +95,20 @@ impl MigrationTrait for Migration { ) .await?; + manager + .create_postgres_auto_update_ts_trigger_for_col(Bangumi::Table, GeneralIds::UpdatedAt) + .await?; + manager .create_table( table_auto(Episodes::Table) .col(pk_auto(Episodes::Id)) .col(text(Episodes::DisplayName)) .col(integer(Episodes::BangumiId)) - .col(text(Episodes::DownloadUrl)) - .col(tiny_integer(Episodes::DownloadProgress).default(0)) .col(text(Episodes::OutputName)) .foreign_key( ForeignKey::create() - .name("episode_bangumi_id") + .name("fk_episode_bangumi_id") .from(Episodes::Table, Episodes::BangumiId) .to(Bangumi::Table, Bangumi::Id), ) @@ -94,32 +116,50 @@ impl MigrationTrait for Migration { ) .await?; + manager + .create_postgres_auto_update_ts_trigger_for_col(Episodes::Table, GeneralIds::UpdatedAt) + .await?; + Ok(()) } async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_postgres_auto_update_ts_trigger_for_col(Episodes::Table, GeneralIds::UpdatedAt) + .await?; manager .drop_table(Table::drop().table(Episodes::Table).to_owned()) .await?; + manager + .drop_postgres_auto_update_ts_trigger_for_col(Bangumi::Table, GeneralIds::UpdatedAt) + .await?; manager .drop_table(Table::drop().table(Bangumi::Table).to_owned()) .await?; + manager + .drop_postgres_auto_update_ts_trigger_for_col( + Subscriptions::Table, + GeneralIds::UpdatedAt, + ) + .await?; manager .drop_table(Table::drop().table(Subscriptions::Table).to_owned()) .await?; manager - .drop_type( - Type::drop() - .name(Alias::new("subscription_category")) - .to_owned(), - ) + .drop_postgres_auto_update_ts_trigger_for_col(Subscribers::Table, GeneralIds::UpdatedAt) .await?; manager .drop_table(Table::drop().table(Subscribers::Table).to_owned()) - .await + .await?; + + manager + .drop_postgres_enum_for_active_enum(subscriptions::SubscriptionCategoryEnum) + .await?; + + Ok(()) } } diff --git a/crates/recorder/src/migrations/m20240224_082543_add_downloads.rs b/crates/recorder/src/migrations/m20240224_082543_add_downloads.rs new file mode 100644 index 0000000..f59028f --- /dev/null +++ b/crates/recorder/src/migrations/m20240224_082543_add_downloads.rs @@ -0,0 +1,116 @@ +use loco_rs::schema::table_auto; +use sea_orm_migration::{prelude::*, schema::*}; + +use super::defs::*; +use crate::models::prelude::{DownloadMime, DownloadStatus}; +use crate::models::prelude::downloads::{DownloadMimeEnum, DownloadStatusEnum}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_postgres_enum_for_active_enum( + DownloadMimeEnum, + &[DownloadMime::OctetStream, DownloadMime::BitTorrent], + ) + .await?; + + manager + .create_postgres_enum_for_active_enum( + DownloadStatusEnum, + &[ + DownloadStatus::Pending, + DownloadStatus::Downloading, + DownloadStatus::Completed, + DownloadStatus::Failed, + DownloadStatus::Deleted, + DownloadStatus::Paused, + ], + ) + .await?; + + manager + .create_table( + table_auto(Downloads::Table) + .col(pk_auto(Downloads::Id)) + .col(string(Downloads::OriginalName)) + .col(string(Downloads::DisplayName)) + .col(integer(Downloads::SubscriptionId)) + .col(enumeration( + Downloads::Status, + DownloadStatusEnum, + DownloadMime::iden_values(), + )) + .col(enumeration( + Downloads::Mime, + DownloadMimeEnum, + DownloadMime::iden_values(), + )) + .col(big_unsigned(Downloads::AllSize)) + .col(big_unsigned(Downloads::CurrSize)) + .foreign_key( + ForeignKey::create() + .name("fk_download_subscription_id") + .from(Downloads::Table, Downloads::SubscriptionId) + .to(Subscriptions::Table, Subscriptions::Id), + ) + .to_owned(), + ) + .await?; + + manager + .create_postgres_auto_update_ts_fn_for_col(GeneralIds::UpdatedAt) + .await?; + + manager + .alter_table( + Table::alter() + .table(Episodes::Table) + .add_column_if_not_exists(integer(Episodes::DownloadId)) + .add_foreign_key( + TableForeignKey::new() + .name("fk_episode_download_id") + .from_tbl(Episodes::Table) + .from_col(Episodes::DownloadId) + .to_tbl(Downloads::Table) + .to_col(Downloads::Id), + ) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Episodes::Table) + .drop_foreign_key(Alias::new("fk_episode_download_id")) + .drop_column(Episodes::DownloadId) + .to_owned(), + ) + .await?; + + manager + .drop_postgres_auto_update_ts_fn_for_col(GeneralIds::UpdatedAt) + .await?; + + manager + .drop_table(Table::drop().table(Downloads::Table).to_owned()) + .await?; + + manager + .drop_postgres_enum_for_active_enum(DownloadMimeEnum) + .await?; + manager + .drop_postgres_enum_for_active_enum(DownloadStatusEnum) + .await?; + + Ok(()) + } +} diff --git a/crates/recorder/src/migrations/mod.rs b/crates/recorder/src/migrations/mod.rs index 99bffc8..04d2ae0 100644 --- a/crates/recorder/src/migrations/mod.rs +++ b/crates/recorder/src/migrations/mod.rs @@ -2,12 +2,16 @@ pub use sea_orm_migration::prelude::*; pub mod defs; pub mod m20220101_000001_init; +pub mod m20240224_082543_add_downloads; pub struct Migrator; #[async_trait::async_trait] impl MigratorTrait for Migrator { fn migrations() -> Vec> { - vec![Box::new(m20220101_000001_init::Migration)] + vec![ + Box::new(m20220101_000001_init::Migration), + Box::new(m20240224_082543_add_downloads::Migration), + ] } } diff --git a/crates/recorder/src/models/_entities/downloads.rs b/crates/recorder/src/models/_entities/downloads.rs new file mode 100644 index 0000000..ed88caf --- /dev/null +++ b/crates/recorder/src/models/_entities/downloads.rs @@ -0,0 +1,75 @@ +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive( + Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize, +)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "download_status")] +#[serde(rename_all = "snake_case")] +pub enum DownloadStatus { + #[sea_orm(string_value = "pending")] + Pending, + #[sea_orm(string_value = "downloading")] + Downloading, + #[sea_orm(string_value = "paused")] + Paused, + #[sea_orm(string_value = "completed")] + Completed, + #[sea_orm(string_value = "failed")] + Failed, + #[sea_orm(string_value = "deleted")] + Deleted, +} + +#[derive( + Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize, +)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "download_mime")] +pub enum DownloadMime { + #[sea_orm(string_value = "application/octet-stream")] + #[serde(rename = "application/octet-stream")] + OctetStream, + #[sea_orm(string_value = "application/x-bittorrent")] + #[serde(rename = "application/x-bittorrent")] + BitTorrent, +} + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] +#[sea_orm(table_name = "downloads")] +pub struct Model { + pub created_at: DateTime, + pub updated_at: DateTime, + #[sea_orm(primary_key)] + pub id: i32, + pub origin_name: String, + pub display_name: String, + pub subscription_id: i32, + pub status: DownloadStatus, + pub mime: DownloadMime, + pub all_size: u64, + pub curr_size: u64, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::subscriptions::Entity", + from = "Column::SubscriptionId", + to = "super::subscriptions::Column::Id" + )] + Subscription, + #[sea_orm(has_many = "super::episodes::Entity")] + Episode, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Subscription.def() + } +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Episode.def() + } +} diff --git a/crates/recorder/src/models/_entities/episodes.rs b/crates/recorder/src/models/_entities/episodes.rs index f265992..3a9e82b 100644 --- a/crates/recorder/src/models/_entities/episodes.rs +++ b/crates/recorder/src/models/_entities/episodes.rs @@ -12,19 +12,24 @@ pub struct Model { pub id: i32, pub display_name: String, pub bangumi_id: i32, - pub download_url: String, - pub download_progress: i32, pub output_name: String, + pub download_id: i32, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation { #[sea_orm( - belongs_to = "super::bangumi::Entity", - from = "Column::BangumiId", - to = "super::bangumi::Column::Id" + belongs_to = "super::bangumi::Entity", + from = "Column::BangumiId", + to = "super::bangumi::Column::Id" )] Bangumi, + #[sea_orm( + belongs_to = "super::downloads::Entity", + from = "Column::DownloadId", + to = "super::downloads::Column::Id" + )] + Downloads, } impl Related for Entity { @@ -32,3 +37,9 @@ impl Related for Entity { Relation::Bangumi.def() } } + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Downloads.def() + } +} diff --git a/crates/recorder/src/models/_entities/mod.rs b/crates/recorder/src/models/_entities/mod.rs index fc0c170..42e7c69 100644 --- a/crates/recorder/src/models/_entities/mod.rs +++ b/crates/recorder/src/models/_entities/mod.rs @@ -3,6 +3,7 @@ pub mod prelude; pub mod bangumi; +pub mod downloads; pub mod episodes; pub mod subscribers; pub mod subscriptions; diff --git a/crates/recorder/src/models/_entities/prelude.rs b/crates/recorder/src/models/_entities/prelude.rs index 621386c..b195379 100644 --- a/crates/recorder/src/models/_entities/prelude.rs +++ b/crates/recorder/src/models/_entities/prelude.rs @@ -1,6 +1,12 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.4 - pub use super::{ - bangumi::Entity as Bangumi, episodes::Entity as Episodes, subscribers::Entity as Subscribers, - subscriptions::Entity as Subscriptions, + bangumi, + bangumi::Entity as Bangumi, + downloads, + downloads::{DownloadMime, DownloadStatus, Entity as Download}, + episodes, + episodes::Entity as Episode, + subscribers, + subscribers::Entity as Subscriber, + subscriptions, + subscriptions::{Entity as Subscription, SubscriptionCategory}, }; diff --git a/crates/recorder/src/models/_entities/subscriptions.rs b/crates/recorder/src/models/_entities/subscriptions.rs index 3e2f42c..0908b54 100644 --- a/crates/recorder/src/models/_entities/subscriptions.rs +++ b/crates/recorder/src/models/_entities/subscriptions.rs @@ -3,7 +3,9 @@ use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[derive( + Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize, DeriveDisplay, +)] #[sea_orm( rs_type = "String", db_type = "Enum", @@ -22,7 +24,9 @@ pub enum SubscriptionCategory { #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "subscriptions")] pub struct Model { + #[sea_orm(column_type = "Timestamp")] pub created_at: DateTime, + #[sea_orm(column_type = "Timestamp")] pub updated_at: DateTime, #[sea_orm(primary_key)] pub id: i32, diff --git a/crates/recorder/src/models/bangumi.rs b/crates/recorder/src/models/bangumi.rs index bd8921a..049a8f5 100644 --- a/crates/recorder/src/models/bangumi.rs +++ b/crates/recorder/src/models/bangumi.rs @@ -3,4 +3,4 @@ use sea_orm::entity::prelude::*; pub use super::_entities::bangumi::{self, ActiveModel, Entity, Model}; #[async_trait::async_trait] -impl ActiveModelBehavior for super::_entities::bangumi::ActiveModel {} +impl ActiveModelBehavior for ActiveModel {} diff --git a/crates/recorder/src/models/downloads.rs b/crates/recorder/src/models/downloads.rs new file mode 100644 index 0000000..9d31af7 --- /dev/null +++ b/crates/recorder/src/models/downloads.rs @@ -0,0 +1,6 @@ +use sea_orm::ActiveModelBehavior; + +use crate::models::_entities::downloads::*; + +#[async_trait::async_trait] +impl ActiveModelBehavior for ActiveModel {} diff --git a/crates/recorder/src/models/episodes.rs b/crates/recorder/src/models/episodes.rs index c9116db..8c775aa 100644 --- a/crates/recorder/src/models/episodes.rs +++ b/crates/recorder/src/models/episodes.rs @@ -3,4 +3,4 @@ use sea_orm::entity::prelude::*; pub use super::_entities::episodes::{self, ActiveModel, Entity, Model}; #[async_trait::async_trait] -impl ActiveModelBehavior for super::_entities::episodes::ActiveModel {} +impl ActiveModelBehavior for ActiveModel {} diff --git a/crates/recorder/src/models/mod.rs b/crates/recorder/src/models/mod.rs index 830f28c..b09e145 100644 --- a/crates/recorder/src/models/mod.rs +++ b/crates/recorder/src/models/mod.rs @@ -1,5 +1,8 @@ pub mod _entities; pub mod bangumi; +pub mod downloads; pub mod episodes; pub mod subscribers; pub mod subscriptions; + +pub use _entities::prelude; diff --git a/crates/recorder/src/models/subscribers.rs b/crates/recorder/src/models/subscribers.rs index 64f36e1..aafcf54 100644 --- a/crates/recorder/src/models/subscribers.rs +++ b/crates/recorder/src/models/subscribers.rs @@ -12,7 +12,7 @@ pub struct SubscriberIdParams { } #[async_trait::async_trait] -impl ActiveModelBehavior for super::_entities::subscribers::ActiveModel { +impl ActiveModelBehavior for ActiveModel { async fn before_save(self, _db: &C, insert: bool) -> Result where C: ConnectionTrait, @@ -27,7 +27,7 @@ impl ActiveModelBehavior for super::_entities::subscribers::ActiveModel { } } -impl super::_entities::subscribers::Model { +impl Model { /// finds a user by the provided pid /// /// # Errors @@ -35,7 +35,7 @@ impl super::_entities::subscribers::Model { /// When could not find user or DB query error pub async fn find_by_pid(db: &DatabaseConnection, pid: &str) -> ModelResult { let parse_uuid = Uuid::parse_str(pid).map_err(|e| ModelError::Any(e.into()))?; - let subscriber = subscribers::Entity::find() + let subscriber = Entity::find() .filter(subscribers::Column::Pid.eq(parse_uuid)) .one(db) .await?; @@ -55,7 +55,7 @@ impl super::_entities::subscribers::Model { pub async fn create_root(db: &DatabaseConnection) -> ModelResult { let txn = db.begin().await?; - let user = subscribers::ActiveModel { + let user = ActiveModel { display_name: ActiveValue::set(ROOT_SUBSCRIBER.to_string()), pid: ActiveValue::set(ROOT_SUBSCRIBER.to_string()), ..Default::default() diff --git a/crates/recorder/src/models/subscriptions.rs b/crates/recorder/src/models/subscriptions.rs index a98a7b5..07f53c8 100644 --- a/crates/recorder/src/models/subscriptions.rs +++ b/crates/recorder/src/models/subscriptions.rs @@ -1,6 +1,79 @@ -use sea_orm::entity::prelude::*; +use sea_orm::{entity::prelude::*, ActiveValue}; -pub use super::_entities::subscriptions::{self, ActiveModel, Entity, Model}; +pub use super::_entities::subscriptions::{self, *}; +use crate::subscriptions::defs::RssCreateDto; #[async_trait::async_trait] -impl ActiveModelBehavior for super::_entities::subscriptions::ActiveModel {} +impl ActiveModelBehavior for ActiveModel {} + +impl Model { + pub async fn add_rss( + db: &DatabaseConnection, + create_dto: RssCreateDto, + subscriber_id: i32, + ) -> eyre::Result { + let subscription = ActiveModel { + display_name: ActiveValue::Set(create_dto.display_name), + enabled: ActiveValue::Set(create_dto.enabled.unwrap_or(false)), + aggregate: ActiveValue::Set(create_dto.aggregate), + subscriber_id: ActiveValue::Set(subscriber_id), + category: ActiveValue::Set(SubscriptionCategory::Mikan), + source_url: ActiveValue::Set(create_dto.rss_link), + ..Default::default() + }; + + Ok(subscription.insert(db).await?) + } + + pub async fn toggle_iters( + db: &DatabaseConnection, + ids: impl Iterator, + enabled: bool, + ) -> eyre::Result<()> { + Entity::update_many() + .col_expr(Column::Enabled, Expr::value(enabled)) + .filter(Column::Id.is_in(ids)) + .exec(db) + .await?; + Ok(()) + } + + pub async fn delete_iters( + db: &DatabaseConnection, + ids: impl Iterator, + ) -> eyre::Result<()> { + Entity::delete_many() + .filter(Column::Id.is_in(ids)) + .exec(db) + .await?; + Ok(()) + } + + // pub async fn pull_rss ( + // db: &DatabaseConnection, + // item: &Self, + // ) -> eyre::Result<()> { + // match &item.category { + // SubscriptionCategory::Mikan => { + // let items = + // MikanSubscriptionEngine::subscription_items_from_rss_url(&item.source_url). + // await?; let items = items.collect::>(); + // let torrent_urls = items.iter().map(|item| item.torrent_url()); + // + // let new_torrents = Entity::find() + // .filter( + // Column::SourceUrl + // ) + // .all(db).await?; + // + // for item in items { + // println!("{:?}", item); + // } + // } + // _ => { + // todo!("other subscription categories") + // } + // } + // Ok(()) + // } +} diff --git a/crates/recorder/src/rss/engine.rs b/crates/recorder/src/rss/engine.rs deleted file mode 100644 index 40eb662..0000000 --- a/crates/recorder/src/rss/engine.rs +++ /dev/null @@ -1,23 +0,0 @@ -use serde::{Deserialize, Serialize}; - -use crate::models::subscriptions::subscriptions; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RssTorrent {} - -#[derive(Debug)] -pub struct RssEngine {} - -impl RssEngine { - // pub async fn get_rss_torrents( - // rss_subscription: &subscriptions::ActiveModel, - // ) -> eyre::Result> { - // Ok(()) - // } - - pub async fn get_torrents(url: &str) -> eyre::Result { - let content = reqwest::get(url).await?.bytes().await?; - let channel: rss::Channel = rss::Channel::read_from(&content[..])?; - Ok(channel) - } -} diff --git a/crates/recorder/src/rss/mod.rs b/crates/recorder/src/rss/mod.rs deleted file mode 100644 index 702e611..0000000 --- a/crates/recorder/src/rss/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod engine; diff --git a/crates/recorder/src/subscriptions/defs.rs b/crates/recorder/src/subscriptions/defs.rs new file mode 100644 index 0000000..79550cd --- /dev/null +++ b/crates/recorder/src/subscriptions/defs.rs @@ -0,0 +1,9 @@ +use crate::models::prelude::*; + +pub struct RssCreateDto { + pub rss_link: String, + pub display_name: String, + pub aggregate: bool, + pub category: SubscriptionCategory, + pub enabled: Option, +} \ No newline at end of file diff --git a/crates/recorder/src/subscriptions/mikan.rs b/crates/recorder/src/subscriptions/mikan.rs index 4d32be8..86e64b2 100644 --- a/crates/recorder/src/subscriptions/mikan.rs +++ b/crates/recorder/src/subscriptions/mikan.rs @@ -1,23 +1,49 @@ -use crate::rss::engine::RssEngine; +use crate::downloader::bytes::download_bytes; +use crate::downloader::defs::BITTORRENT_MIME_TYPE; -pub struct MikanRssCreateDto { - pub rss_link: String, - pub display_name: String, - pub aggregate: bool, - pub enabled: Option, +#[derive(Debug, Clone)] +pub struct MikanSubscriptionItem { + pub item: rss::Item, } -pub struct MikanSubscriptionEngine { -} - -impl MikanSubscriptionEngine { - pub async fn add_rss(create_dto: MikanRssCreateDto) -> eyre::Result<()> { - let content = reqwest::get(&create_dto.rss_link).await?.bytes().await?; - let channel = rss::Channel::read_from(&content[..])?; - - Ok(()) +impl From for MikanSubscriptionItem { + fn from(item: rss::Item) -> Self { + MikanSubscriptionItem { + item + } } } -pub struct MikanSubscriptionItem { +impl MikanSubscriptionItem { + pub fn title(&self) -> &str { + self.item.title().unwrap_or_default() + } + + pub fn homepage(&self) -> Option<&str> { + self.item.link() + } + + pub fn torrent_url (&self) -> Option<&str> { + self.item.enclosure().and_then(|en| { + if en.mime_type == BITTORRENT_MIME_TYPE { + Some(en.url.as_str()) + } else { + None + } + }) + } +} + +pub struct MikanSubscriptionEngine; + +impl MikanSubscriptionEngine { + pub async fn subscription_items_from_rss_url ( + url: &str + ) -> eyre::Result> { + let bytes = download_bytes(url).await?; + + let channel = rss::Channel::read_from(&bytes[..])?; + + Ok(channel.items.into_iter().map(MikanSubscriptionItem::from)) + } } diff --git a/crates/recorder/src/subscriptions/mod.rs b/crates/recorder/src/subscriptions/mod.rs index e3de33e..584bf75 100644 --- a/crates/recorder/src/subscriptions/mod.rs +++ b/crates/recorder/src/subscriptions/mod.rs @@ -1,2 +1,3 @@ +pub mod defs; pub mod bangumi; pub mod mikan; diff --git a/src/tasks/rss_dl.rs b/src/tasks/rss_dl.rs deleted file mode 100644 index 626a301..0000000 --- a/src/tasks/rss_dl.rs +++ /dev/null @@ -1,18 +0,0 @@ -use std::collections::BTreeMap; - -use loco_rs::prelude::*; - -pub struct RssDl; -#[async_trait] -impl Task for RssDl { - fn task(&self) -> TaskInfo { - TaskInfo { - name: "rss_dl".to_string(), - detail: "Task generator".to_string(), - } - } - async fn run(&self, _app_context: &AppContext, _vars: &BTreeMap) -> Result<()> { - println!("Task RssDl generated"); - Ok(()) - } -}