diff --git a/Cargo.lock b/Cargo.lock index cc794ab..77d038b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2814,6 +2814,7 @@ dependencies = [ "futures", "include_dir", "insta", + "itertools", "loco-rs", "reqwest", "rss", diff --git a/crates/recorder/Cargo.toml b/crates/recorder/Cargo.toml index 58a19bd..a971365 100644 --- a/crates/recorder/Cargo.toml +++ b/crates/recorder/Cargo.toml @@ -35,6 +35,7 @@ thiserror = "1.0.57" rss = "2.0.7" bytes = "1.5.0" futures = "0.3.30" +itertools = "0.12.1" [lib] name = "recorder" diff --git a/crates/recorder/src/migrations/defs.rs b/crates/recorder/src/migrations/defs.rs index aa2c999..b5bb9c1 100644 --- a/crates/recorder/src/migrations/defs.rs +++ b/crates/recorder/src/migrations/defs.rs @@ -62,6 +62,7 @@ pub enum Downloads { CurrSize, AllSize, Mime, + Url, } #[async_trait::async_trait] diff --git a/crates/recorder/src/migrations/m20240224_082543_add_downloads.rs b/crates/recorder/src/migrations/m20240224_082543_add_downloads.rs index f59028f..a4ead9f 100644 --- a/crates/recorder/src/migrations/m20240224_082543_add_downloads.rs +++ b/crates/recorder/src/migrations/m20240224_082543_add_downloads.rs @@ -51,6 +51,10 @@ impl MigrationTrait for Migration { )) .col(big_unsigned(Downloads::AllSize)) .col(big_unsigned(Downloads::CurrSize)) + .col(text(Downloads::Url)) + .index( + Index::create().table(Downloads::Table).col(Downloads::Url).name("idx_download_url") + ) .foreign_key( ForeignKey::create() .name("fk_download_subscription_id") diff --git a/crates/recorder/src/models/_entities/downloads.rs b/crates/recorder/src/models/_entities/downloads.rs index ed88caf..020804d 100644 --- a/crates/recorder/src/models/_entities/downloads.rs +++ b/crates/recorder/src/models/_entities/downloads.rs @@ -2,7 +2,7 @@ use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; #[derive( - Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize, +Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize, )] #[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "download_status")] #[serde(rename_all = "snake_case")] @@ -22,7 +22,7 @@ pub enum DownloadStatus { } #[derive( - Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize, +Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, DeriveDisplay, Serialize, Deserialize, )] #[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "download_mime")] pub enum DownloadMime { @@ -46,6 +46,7 @@ pub struct Model { pub subscription_id: i32, pub status: DownloadStatus, pub mime: DownloadMime, + pub url: String, pub all_size: u64, pub curr_size: u64, } @@ -53,9 +54,9 @@ pub struct Model { #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation { #[sea_orm( - belongs_to = "super::subscriptions::Entity", - from = "Column::SubscriptionId", - to = "super::subscriptions::Column::Id" + belongs_to = "super::subscriptions::Entity", + from = "Column::SubscriptionId", + to = "super::subscriptions::Column::Id" )] Subscription, #[sea_orm(has_many = "super::episodes::Entity")] diff --git a/crates/recorder/src/models/downloads.rs b/crates/recorder/src/models/downloads.rs index 9d31af7..497ed45 100644 --- a/crates/recorder/src/models/downloads.rs +++ b/crates/recorder/src/models/downloads.rs @@ -1,6 +1,67 @@ -use sea_orm::ActiveModelBehavior; +use std::collections::HashMap; +use sea_orm::{prelude::*, ActiveValue, Condition, QuerySelect, SelectColumns}; use crate::models::_entities::downloads::*; +use crate::models::prelude::{SubscriptionCategory, subscriptions}; +use crate::subscriptions::mikan::MikanSubscriptionEngine; #[async_trait::async_trait] impl ActiveModelBehavior for ActiveModel {} + +impl Model { + pub async fn pull_subscription( + db: &DatabaseConnection, + item: &subscriptions::Model, + ) -> eyre::Result<()> { + match &item.category { + SubscriptionCategory::Mikan => { + let items = + MikanSubscriptionEngine::subscription_items_from_rss_url(&item.source_url). + await?; + let items = items.collect::>(); + let mut all_items = items + .into_iter() + .map(|item| (item.url.clone(), item)) + .collect::>(); + + let existed_items = { + Entity::find() + .filter( + Condition::all() + .add(Column::SubscriptionId.eq(item.id)) + .add(Column::Url.is_in(all_items.keys().cloned())) + ) + .select_only() + .select_column(Column::Url) + .all(db).await? + }; + + for dl in existed_items { + all_items.remove(&dl.url as &str); + } + + let new_items = all_items.into_values().map(|i| { + ActiveModel { + origin_name: ActiveValue::Set(i.title.clone()), + display_name: ActiveValue::Set(i.title), + subscription_id: ActiveValue::Set(item.id), + status: ActiveValue::Set(DownloadStatus::Pending), + mime: ActiveValue::Set(DownloadMime::BitTorrent), + url: ActiveValue::Set(i.url), + all_size: ActiveValue::Set(i.content_length.unwrap_or_default()), + curr_size: ActiveValue::Set(0), + ..Default::default() + } + }); + + Entity::insert_many(new_items) + .exec(db) + .await?; + } + _ => { + todo!("other subscription categories") + } + } + Ok(()) + } +} diff --git a/crates/recorder/src/models/subscriptions.rs b/crates/recorder/src/models/subscriptions.rs index 07f53c8..f4f7d4b 100644 --- a/crates/recorder/src/models/subscriptions.rs +++ b/crates/recorder/src/models/subscriptions.rs @@ -48,32 +48,4 @@ impl Model { .await?; Ok(()) } - - // pub async fn pull_rss ( - // db: &DatabaseConnection, - // item: &Self, - // ) -> eyre::Result<()> { - // match &item.category { - // SubscriptionCategory::Mikan => { - // let items = - // MikanSubscriptionEngine::subscription_items_from_rss_url(&item.source_url). - // await?; let items = items.collect::>(); - // let torrent_urls = items.iter().map(|item| item.torrent_url()); - // - // let new_torrents = Entity::find() - // .filter( - // Column::SourceUrl - // ) - // .all(db).await?; - // - // for item in items { - // println!("{:?}", item); - // } - // } - // _ => { - // todo!("other subscription categories") - // } - // } - // Ok(()) - // } } diff --git a/crates/recorder/src/subscriptions/mikan.rs b/crates/recorder/src/subscriptions/mikan.rs index 86e64b2..5f91943 100644 --- a/crates/recorder/src/subscriptions/mikan.rs +++ b/crates/recorder/src/subscriptions/mikan.rs @@ -3,47 +3,46 @@ use crate::downloader::defs::BITTORRENT_MIME_TYPE; #[derive(Debug, Clone)] pub struct MikanSubscriptionItem { - pub item: rss::Item, -} - -impl From for MikanSubscriptionItem { - fn from(item: rss::Item) -> Self { - MikanSubscriptionItem { - item - } - } + pub title: String, + pub home_page: Option, + pub url: String, + pub content_length: Option, + pub mime: String, + pub pub_date: Option, } impl MikanSubscriptionItem { - pub fn title(&self) -> &str { - self.item.title().unwrap_or_default() - } - - pub fn homepage(&self) -> Option<&str> { - self.item.link() - } - - pub fn torrent_url (&self) -> Option<&str> { - self.item.enclosure().and_then(|en| { - if en.mime_type == BITTORRENT_MIME_TYPE { - Some(en.url.as_str()) - } else { - None - } - }) + pub fn from_rss_item(item: rss::Item) -> Option { + let mime_match = item.enclosure() + .map(|x| x.mime_type == BITTORRENT_MIME_TYPE) + .unwrap_or_default(); + if mime_match { + let enclosure = item.enclosure.unwrap(); + let content_length = enclosure.length.parse().ok(); + Some(MikanSubscriptionItem { + title: item.title.unwrap_or_default(), + home_page: item.link, + url: enclosure.url, + content_length, + mime: enclosure.mime_type, + pub_date: item.pub_date, + }) + } else { + None + } } } pub struct MikanSubscriptionEngine; impl MikanSubscriptionEngine { - pub async fn subscription_items_from_rss_url ( + pub async fn subscription_items_from_rss_url( url: &str - ) -> eyre::Result> { + ) -> eyre::Result> { let bytes = download_bytes(url).await?; let channel = rss::Channel::read_from(&bytes[..])?; - Ok(channel.items.into_iter().map(MikanSubscriptionItem::from)) + Ok(channel.items.into_iter().flat_map(MikanSubscriptionItem::from_rss_item)) } }