diff --git a/apps/recorder/src/extract/mikan/subscription.rs b/apps/recorder/src/extract/mikan/subscription.rs index 6cffaec..c59e27f 100644 --- a/apps/recorder/src/extract/mikan/subscription.rs +++ b/apps/recorder/src/extract/mikan/subscription.rs @@ -6,18 +6,16 @@ use std::{ use async_graphql::{InputObject, SimpleObject}; use fetch::fetch_bytes; -use futures::try_join; -use itertools::Itertools; +use futures::{Stream, TryStreamExt, pin_mut, try_join}; use maplit::hashmap; use sea_orm::{ - ActiveValue::Set, ColumnTrait, Condition, EntityTrait, JoinType, QueryFilter, QuerySelect, - RelationTrait, + ColumnTrait, Condition, EntityTrait, JoinType, QueryFilter, QuerySelect, RelationTrait, }; use serde::{Deserialize, Serialize}; -use snafu::OptionExt; +use snafu::{OptionExt, ResultExt}; use url::Url; -use super::scrape_mikan_bangumi_meta_list_from_season_flow_url; +use super::scrape_mikan_bangumi_meta_stream_from_season_flow_url; use crate::{ app::AppContextTrait, errors::{RecorderError, RecorderResult}, @@ -158,8 +156,8 @@ impl SubscriptionTrait for MikanSubscriberSubscription { self.id } - async fn sync_feeds(&self, ctx: Arc) -> RecorderResult<()> { - let rss_item_list = self.get_rss_item_list(ctx.as_ref()).await?; + async fn sync_feeds_incremental(&self, ctx: Arc) -> RecorderResult<()> { + let rss_item_list = self.get_rss_item_list_from_source_url(ctx.as_ref()).await?; sync_mikan_feeds_from_rss_item_list( ctx.as_ref(), @@ -172,6 +170,22 @@ impl SubscriptionTrait for MikanSubscriberSubscription { Ok(()) } + async fn sync_feeds_full(&self, ctx: Arc) -> RecorderResult<()> { + self.sync_feeds_incremental(ctx.clone()).await?; + + let rss_item_list = self + .get_rss_item_list_from_subsribed_url_rss_link(ctx.as_ref()) + .await?; + + sync_mikan_feeds_from_rss_item_list( + ctx.as_ref(), + rss_item_list, + self.get_subscriber_id(), + self.get_subscription_id(), + ) + .await + } + async fn sync_sources(&self, _ctx: Arc) -> RecorderResult<()> { Ok(()) } @@ -198,7 +212,7 @@ impl SubscriptionTrait for MikanSubscriberSubscription { impl MikanSubscriberSubscription { #[tracing::instrument(err, skip(ctx))] - async fn get_rss_item_list( + async fn get_rss_item_list_from_source_url( &self, ctx: &dyn AppContextTrait, ) -> RecorderResult> { @@ -213,13 +227,47 @@ impl MikanSubscriberSubscription { let mut result = vec![]; for (idx, item) in channel.items.into_iter().enumerate() { - let item = MikanRssItem::try_from(item).inspect_err( - |error| tracing::warn!(error = %error, "failed to extract rss item idx = {}", idx), - )?; + let item = MikanRssItem::try_from(item) + .with_whatever_context::<_, String, RecorderError>(|_| { + format!("failed to extract rss item at idx {idx}") + })?; result.push(item); } Ok(result) } + + #[tracing::instrument(err, skip(ctx))] + async fn get_rss_item_list_from_subsribed_url_rss_link( + &self, + ctx: &dyn AppContextTrait, + ) -> RecorderResult> { + let subscribed_bangumi_list = + bangumi::Model::get_subsribed_bangumi_list_from_subscription(ctx, self.id).await?; + + let mut rss_item_list = vec![]; + for subscribed_bangumi in subscribed_bangumi_list { + let rss_url = subscribed_bangumi + .rss_link + .with_whatever_context::<_, String, RecorderError>(|| { + format!( + "rss link is required, subscription_id = {:?}, bangumi_name = {}", + self.id, subscribed_bangumi.display_name + ) + })?; + let bytes = fetch_bytes(ctx.mikan(), rss_url).await?; + + let channel = rss::Channel::read_from(&bytes[..])?; + + for (idx, item) in channel.items.into_iter().enumerate() { + let item = MikanRssItem::try_from(item) + .with_whatever_context::<_, String, RecorderError>(|_| { + format!("failed to extract rss item at idx {idx}") + })?; + rss_item_list.push(item); + } + } + Ok(rss_item_list) + } } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, InputObject, SimpleObject)] @@ -241,8 +289,10 @@ impl SubscriptionTrait for MikanSeasonSubscription { self.id } - async fn sync_feeds(&self, ctx: Arc) -> RecorderResult<()> { - let rss_item_list = self.get_rss_item_list(ctx.as_ref()).await?; + async fn sync_feeds_incremental(&self, ctx: Arc) -> RecorderResult<()> { + let rss_item_list = self + .get_rss_item_list_from_subsribed_url_rss_link(ctx.as_ref()) + .await?; sync_mikan_feeds_from_rss_item_list( ctx.as_ref(), @@ -255,31 +305,36 @@ impl SubscriptionTrait for MikanSeasonSubscription { Ok(()) } + async fn sync_feeds_full(&self, ctx: Arc) -> RecorderResult<()> { + self.sync_sources(ctx.clone()).await?; + self.sync_feeds_incremental(ctx).await + } + async fn sync_sources(&self, ctx: Arc) -> RecorderResult<()> { - let bangumi_meta_list = self.get_bangumi_meta_list(ctx.clone()).await?; + let bangumi_meta_list = self.get_bangumi_meta_stream_from_source_url(ctx.clone()); - let mikan_base_url = ctx.mikan().base_url(); + pin_mut!(bangumi_meta_list); - let rss_link_list = bangumi_meta_list - .into_iter() - .map(|bangumi_meta| { - build_mikan_bangumi_subscription_rss_url( - mikan_base_url.clone(), - &bangumi_meta.mikan_bangumi_id, - Some(&bangumi_meta.mikan_fansub_id), - ) - .to_string() - }) - .collect_vec(); - - subscriptions::Entity::update_many() - .set(subscriptions::ActiveModel { - source_urls: Set(Some(rss_link_list)), - ..Default::default() - }) - .filter(subscription_bangumi::Column::SubscriptionId.eq(self.id)) - .exec(ctx.db()) + while let Some(bangumi_meta) = bangumi_meta_list.try_next().await? { + let bangumi_hash = bangumi_meta.bangumi_hash(); + bangumi::Model::get_or_insert_from_mikan( + ctx.as_ref(), + bangumi_hash, + self.get_subscriber_id(), + self.get_subscription_id(), + async || { + let bangumi_am = bangumi::ActiveModel::from_mikan_bangumi_meta( + ctx.as_ref(), + bangumi_meta, + self.get_subscriber_id(), + self.get_subscription_id(), + ) + .await?; + Ok(bangumi_am) + }, + ) .await?; + } Ok(()) } @@ -290,8 +345,8 @@ impl SubscriptionTrait for MikanSeasonSubscription { let source_url_meta = MikanSeasonFlowUrlMeta::from_url(&source_url) .with_whatever_context::<_, String, RecorderError>(|| { format!( - "MikanSeasonSubscription should extract season_str and year from source_url, \ - source_url = {}, subscription_id = {}", + "season_str and year is required when extracting MikanSeasonSubscription from \ + source_url, source_url = {}, subscription_id = {}", source_url, model.id ) })?; @@ -300,7 +355,8 @@ impl SubscriptionTrait for MikanSeasonSubscription { .credential_id .with_whatever_context::<_, String, RecorderError>(|| { format!( - "MikanSeasonSubscription credential_id is required, subscription_id = {}", + "credential_id is required when extracting MikanSeasonSubscription, \ + subscription_id = {}", model.id ) })?; @@ -316,11 +372,10 @@ impl SubscriptionTrait for MikanSeasonSubscription { } impl MikanSeasonSubscription { - #[tracing::instrument(err, skip(ctx))] - async fn get_bangumi_meta_list( + pub fn get_bangumi_meta_stream_from_source_url( &self, ctx: Arc, - ) -> RecorderResult> { + ) -> impl Stream> { let credential_id = self.credential_id; let year = self.year; let season_str = self.season_str; @@ -328,16 +383,15 @@ impl MikanSeasonSubscription { let mikan_base_url = ctx.mikan().base_url().clone(); let mikan_season_flow_url = build_mikan_season_flow_url(mikan_base_url, year, season_str); - scrape_mikan_bangumi_meta_list_from_season_flow_url( + scrape_mikan_bangumi_meta_stream_from_season_flow_url( ctx, mikan_season_flow_url, credential_id, ) - .await } #[tracing::instrument(err, skip(ctx))] - async fn get_rss_item_list( + async fn get_rss_item_list_from_subsribed_url_rss_link( &self, ctx: &dyn AppContextTrait, ) -> RecorderResult> { @@ -358,8 +412,8 @@ impl MikanSeasonSubscription { .rss_link .with_whatever_context::<_, String, RecorderError>(|| { format!( - "MikanSeasonSubscription rss_link is required, subscription_id = {}", - self.id + "rss_link is required, subscription_id = {}, bangumi_name = {}", + self.id, subscribed_bangumi.display_name ) })?; let bytes = fetch_bytes(ctx.mikan(), rss_url).await?; @@ -367,9 +421,10 @@ impl MikanSeasonSubscription { let channel = rss::Channel::read_from(&bytes[..])?; for (idx, item) in channel.items.into_iter().enumerate() { - let item = MikanRssItem::try_from(item).inspect_err( - |error| tracing::warn!(error = %error, "failed to extract rss item idx = {}", idx), - )?; + let item = MikanRssItem::try_from(item) + .with_whatever_context::<_, String, RecorderError>(|_| { + format!("failed to extract rss item at idx {idx}") + })?; rss_item_list.push(item); } } @@ -395,20 +450,24 @@ impl SubscriptionTrait for MikanBangumiSubscription { self.id } - async fn sync_feeds(&self, ctx: Arc) -> RecorderResult<()> { - let rss_item_list = self.get_rss_item_list(ctx.as_ref()).await?; + async fn sync_feeds_incremental(&self, ctx: Arc) -> RecorderResult<()> { + let rss_item_list = self.get_rss_item_list_from_source_url(ctx.as_ref()).await?; sync_mikan_feeds_from_rss_item_list( ctx.as_ref(), rss_item_list, - ::get_subscriber_id(self), - ::get_subscription_id(self), + self.get_subscriber_id(), + self.get_subscription_id(), ) .await?; Ok(()) } + async fn sync_feeds_full(&self, _ctx: Arc) -> RecorderResult<()> { + self.sync_feeds_incremental(_ctx).await + } + async fn sync_sources(&self, _ctx: Arc) -> RecorderResult<()> { Ok(()) } @@ -419,8 +478,8 @@ impl SubscriptionTrait for MikanBangumiSubscription { let meta = MikanBangumiHash::from_rss_url(&source_url) .with_whatever_context::<_, String, RecorderError>(|| { format!( - "MikanBangumiSubscription need to extract bangumi id and fansub id from \ - source_url = {}, subscription_id = {}", + "bangumi_id and fansub_id is required when extracting \ + MikanBangumiSubscription, source_url = {}, subscription_id = {}", source_url, model.id ) })?; @@ -436,7 +495,7 @@ impl SubscriptionTrait for MikanBangumiSubscription { impl MikanBangumiSubscription { #[tracing::instrument(err, skip(ctx))] - async fn get_rss_item_list( + async fn get_rss_item_list_from_source_url( &self, ctx: &dyn AppContextTrait, ) -> RecorderResult> { @@ -452,9 +511,10 @@ impl MikanBangumiSubscription { let mut result = vec![]; for (idx, item) in channel.items.into_iter().enumerate() { - let item = MikanRssItem::try_from(item).inspect_err( - |error| tracing::warn!(error = %error, "failed to extract rss item idx = {}", idx), - )?; + let item = MikanRssItem::try_from(item) + .with_whatever_context::<_, String, RecorderError>(|_| { + format!("failed to extract rss item at idx {idx}") + })?; result.push(item); } Ok(result) diff --git a/apps/recorder/src/extract/mikan/web.rs b/apps/recorder/src/extract/mikan/web.rs index 1b56855..99e77ac 100644 --- a/apps/recorder/src/extract/mikan/web.rs +++ b/apps/recorder/src/extract/mikan/web.rs @@ -152,30 +152,12 @@ pub struct MikanBangumiMeta { pub fansub: String, } -#[async_graphql::Object] impl MikanBangumiMeta { - async fn homepage(&self) -> &str { - self.homepage.as_str() - } - - async fn origin_poster_src(&self) -> Option<&str> { - self.origin_poster_src.as_ref().map(|url| url.as_str()) - } - - async fn bangumi_title(&self) -> &str { - &self.bangumi_title - } - - async fn mikan_bangumi_id(&self) -> &str { - &self.mikan_bangumi_id - } - - async fn mikan_fansub_id(&self) -> &str { - &self.mikan_fansub_id - } - - async fn fansub(&self) -> &str { - &self.fansub + pub fn bangumi_hash(&self) -> MikanBangumiHash { + MikanBangumiHash { + mikan_bangumi_id: self.mikan_bangumi_id.clone(), + mikan_fansub_id: self.mikan_fansub_id.clone(), + } } } diff --git a/apps/recorder/src/graphql/infra/filter.rs b/apps/recorder/src/graphql/infra/filter.rs index 6ca5720..9714451 100644 --- a/apps/recorder/src/graphql/infra/filter.rs +++ b/apps/recorder/src/graphql/infra/filter.rs @@ -1,8 +1,21 @@ -use async_graphql::dynamic::{ObjectAccessor, TypeRef}; +use async_graphql::{ + InputObject, InputValueResult, Scalar, ScalarType, + dynamic::{ObjectAccessor, SchemaError, TypeRef}, +}; +use itertools::Itertools; use maplit::btreeset; use once_cell::sync::OnceCell; -use sea_orm::{ColumnTrait, Condition, EntityTrait, Value}; -use seaography::{BuilderContext, FilterInfo, FilterOperation, SeaResult}; +use sea_orm::{ + ColumnTrait, Condition, EntityTrait, + prelude::Expr, + sea_query::{self, IntoCondition, SimpleExpr, extension::postgres::PgExpr}, +}; +use seaography::{ + BuilderContext, FilterInfo, FilterOperation as SeaographqlFilterOperation, SeaResult, +}; +use serde_json::Value; + +use crate::errors::{RecorderError, RecorderResult}; pub static SUBSCRIBER_ID_FILTER_INFO: OnceCell = OnceCell::new(); @@ -10,7 +23,7 @@ pub fn init_custom_filter_info() { SUBSCRIBER_ID_FILTER_INFO.get_or_init(|| FilterInfo { type_name: String::from("SubscriberIdFilterInput"), base_type: TypeRef::INT.into(), - supported_operations: btreeset! { FilterOperation::Equals }, + supported_operations: btreeset! { SeaographqlFilterOperation::Equals }, }); } @@ -31,10 +44,10 @@ where let operations = &subscriber_id_filter_info.supported_operations; for operation in operations { match operation { - FilterOperation::Equals => { + SeaographqlFilterOperation::Equals => { if let Some(value) = filter.get("eq") { let value: i32 = value.i64()?.try_into()?; - let value = Value::Int(Some(value)); + let value = sea_orm::Value::Int(Some(value)); condition = condition.add(column.eq(value)); } } @@ -44,3 +57,441 @@ where Ok(condition) }) } + +#[derive(Clone, Debug, InputObject)] +pub struct StringFilterInput { + pub eq: Option, + pub ne: Option, + pub gt: Option, + pub gte: Option, + pub lt: Option, + pub lte: Option, + pub in_: Option>, + pub nin: Option>, + pub is_null: Option, + pub is_not_null: Option, + pub contains: Option, + pub starts_with: Option, + pub ends_with: Option, + pub like: Option, + pub not_like: Option, + pub between: Option, + pub not_between: Option, +} + +#[derive(Clone, Debug, InputObject)] +pub struct TextFilterInput { + pub eq: Option, + pub ne: Option, + pub gt: Option, + pub gte: Option, + pub lt: Option, + pub lte: Option, + pub in_: Option>, + pub nin: Option>, + pub is_null: Option, + pub between: Option, + pub not_between: Option, +} + +#[derive(Clone, Debug, InputObject)] +pub struct IntFilterInput { + pub eq: Option, + pub ne: Option, + pub gt: Option, + pub gte: Option, + pub lt: Option, + pub lte: Option, + pub in_: Option>, + pub nin: Option>, + pub is_null: Option, + pub is_not_null: Option, + pub between: Option, + pub not_between: Option, +} + +#[derive(Clone, Debug, InputObject)] +pub struct FloatFilterInput { + pub eq: Option, + pub ne: Option, + pub gt: Option, + pub gte: Option, + pub lt: Option, + pub lte: Option, + pub in_: Option>, + pub nin: Option>, + pub is_null: Option, + pub is_not_null: Option, + pub between: Option, + pub not_between: Option, +} + +#[derive(Clone, Debug, InputObject)] +pub struct BooleanFilterInput { + pub eq: Option, + pub ne: Option, + pub gt: Option, + pub gte: Option, + pub lt: Option, + pub lte: Option, + pub in_: Option>, + pub nin: Option>, + pub is_null: Option, + pub is_not_null: Option, +} + +#[derive(Clone, Debug, InputObject)] +pub struct JsonArrayFilterInput { + pub is_null: Option, + pub is_not_null: Option, + pub contains: Option, +} + +#[derive(Clone, Debug)] +pub struct JsonFilterInput(pub serde_json::Value); + +#[Scalar(name = "JsonFilterInput")] +impl ScalarType for JsonFilterInput { + fn parse(value: async_graphql::Value) -> InputValueResult { + Ok(JsonFilterInput(value.into_json()?)) + } + + fn to_value(&self) -> async_graphql::Value { + async_graphql::Value::from_json(self.0.clone()).unwrap() + } +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Copy)] +pub enum JsonFilterOperation { + Equals, + NotEquals, + GreaterThan, + GreaterThanEquals, + LessThan, + LessThanEquals, + IsIn, + IsNotIn, + IsNull, + IsNotNull, + Contains, + StartsWith, + EndsWith, + Like, + NotLike, + Exists, + NotExists, + Between, + NotBetween, + And, + Or, +} + +impl JsonFilterOperation { + pub fn is_filter_operation(property_key: &str) -> bool { + property_key.starts_with("$") + } + + pub fn parse_str(value: &str) -> Result, async_graphql::dynamic::SchemaError> { + match value { + "$eq" => Ok(Some(JsonFilterOperation::Equals)), + "$ne" => Ok(Some(JsonFilterOperation::NotEquals)), + "$gt" => Ok(Some(JsonFilterOperation::GreaterThan)), + "$gte" => Ok(Some(JsonFilterOperation::GreaterThanEquals)), + "$lt" => Ok(Some(JsonFilterOperation::LessThan)), + "$lte" => Ok(Some(JsonFilterOperation::LessThanEquals)), + "$is_in" => Ok(Some(JsonFilterOperation::IsIn)), + "$is_not_in" => Ok(Some(JsonFilterOperation::IsNotIn)), + "$is_null" => Ok(Some(JsonFilterOperation::IsNull)), + "$is_not_null" => Ok(Some(JsonFilterOperation::IsNotNull)), + "$contains" => Ok(Some(JsonFilterOperation::Contains)), + "$starts_with" => Ok(Some(JsonFilterOperation::StartsWith)), + "$ends_with" => Ok(Some(JsonFilterOperation::EndsWith)), + "$like" => Ok(Some(JsonFilterOperation::Like)), + "$not_like" => Ok(Some(JsonFilterOperation::NotLike)), + "$between" => Ok(Some(JsonFilterOperation::Between)), + "$not_between" => Ok(Some(JsonFilterOperation::NotBetween)), + "$and" => Ok(Some(JsonFilterOperation::And)), + "$or" => Ok(Some(JsonFilterOperation::Or)), + "$exists" => Ok(Some(JsonFilterOperation::Exists)), + "$not_exists" => Ok(Some(JsonFilterOperation::NotExists)), + s if Self::is_filter_operation(s) => Err(async_graphql::dynamic::SchemaError(format!( + "Use reserved but not implemented filter operation: {value}" + ))), + _ => Ok(None), + } + } +} + +impl AsRef for JsonFilterOperation { + fn as_ref(&self) -> &str { + match self { + JsonFilterOperation::Equals => "$eq", + JsonFilterOperation::NotEquals => "$ne", + JsonFilterOperation::GreaterThan => "$gt", + JsonFilterOperation::GreaterThanEquals => "$gte", + JsonFilterOperation::LessThan => "$lt", + JsonFilterOperation::LessThanEquals => "$lte", + JsonFilterOperation::IsIn => "$is_in", + JsonFilterOperation::IsNotIn => "$is_not_in", + JsonFilterOperation::IsNull => "$is_null", + JsonFilterOperation::IsNotNull => "$is_not_null", + JsonFilterOperation::Contains => "$contains", + JsonFilterOperation::StartsWith => "$starts_with", + JsonFilterOperation::EndsWith => "$ends_with", + JsonFilterOperation::Like => "$like", + JsonFilterOperation::NotLike => "$not_like", + JsonFilterOperation::Between => "$between", + JsonFilterOperation::NotBetween => "$not_between", + JsonFilterOperation::And => "$and", + JsonFilterOperation::Or => "$or", + JsonFilterOperation::Exists => "$exists", + JsonFilterOperation::NotExists => "$not_exists", + } + } +} + +fn build_json_leaf_get_expr( + expr: impl Into, + path: &[&str], +) -> RecorderResult { + if path.is_empty() { + Err(async_graphql::dynamic::SchemaError( + "JsonFilterInput path must be at least one level deep".to_string(), + ))? + } + let mut expr = expr.into(); + for key in path { + expr = expr.get_json_field(*key); + } + Ok(expr) +} + +fn build_json_leaf_cast_expr( + expr: impl Into, + path: &[&str], +) -> RecorderResult { + if path.is_empty() { + Err(async_graphql::dynamic::SchemaError( + "JsonFilterInput path must be at least one level deep".to_string(), + ))? + } + let mut expr = expr.into(); + for key in path.iter().take(path.len() - 1) { + expr = expr.get_json_field(*key); + } + expr = expr.cast_json_field(path[path.len() - 1]); + Ok(expr) +} + +fn build_json_path_expr(path: &[&str]) -> SimpleExpr { + Expr::val(format!("$.{}", path.join("."))).into() +} + +fn build_json_path_exists_expr(col_expr: impl Into, path: &[&str]) -> SimpleExpr { + Expr::cust_with_exprs( + "JSON_EXISTS($1, $2)", + [col_expr.into(), build_json_path_expr(path)], + ) +} + +fn build_json_path_query_expr(col: impl Into, path: &[&str]) -> SimpleExpr { + Expr::cust_with_exprs("".to_string(), [col.into(), build_json_path_expr(path)]) +} + +fn build_json_value_is_in_expr( + col_expr: impl Into, + path: &[&str], + values: Vec, +) -> RecorderResult { + let template = format!( + "jsonb_path_query($1, $2) = ANY(ARRAY[{}]::jsonb[])", + (0..values.len()) + .map(|i| format!("${}::jsonb", i + 3)) + .join(",") + ); + let values = values + .into_iter() + .map(|v| serde_json::to_string(&v)) + .collect::, _>>()?; + let mut exprs = vec![col_expr.into(), build_json_path_expr(path)]; + exprs.extend(values.into_iter().map(|v| Expr::val(v).into())); + dbg!(&exprs); + Ok(Expr::cust_with_exprs(template, exprs)) +} + +fn prepare_json_leaf_condition( + col_expr: impl Into, + op: JsonFilterOperation, + value: Value, + path: &[&str], +) -> RecorderResult { + Ok(match (op, value) { + ( + op @ (JsonFilterOperation::Exists | JsonFilterOperation::NotExists), + Value::Bool(exists), + ) => { + let json_exists_expr = build_json_path_exists_expr(col_expr, path); + if (op == JsonFilterOperation::Exists && exists) + || (op == JsonFilterOperation::NotExists && !exists) + { + json_exists_expr.into_condition() + } else { + json_exists_expr.not().into_condition() + } + } + (JsonFilterOperation::Exists | JsonFilterOperation::NotExists, _) => { + Err(SchemaError(format!( + "JsonFilterInput leaf can not be $exists or $not_exists with a non-boolean value" + )))? + } + (JsonFilterOperation::And | JsonFilterOperation::Or, _) => { + unreachable!("JsonFilterInput leaf can not be $and or $or with any value") + } + (JsonFilterOperation::Equals, value) => { + let expr = build_json_leaf_get_expr(col_expr, path)?; + expr.eq(value).into_condition() + } + (JsonFilterOperation::NotEquals, value) => { + let expr = build_json_leaf_get_expr(col_expr, path)?; + expr.ne(value).into_condition() + } + + ( + JsonFilterOperation::GreaterThan + | JsonFilterOperation::GreaterThanEquals + | JsonFilterOperation::LessThan + | JsonFilterOperation::LessThanEquals, + Value::Array(_), + ) => Err(SchemaError(format!( + "JsonFilterInput leaf can not be {} with an array", + op.as_ref() + )))?, + (_, _) => todo!(), + }) +} + +// fn recursive_prepare_json_node_condition<'a, E>( +// expr: &'a E, +// node: Value, +// mut path: Vec<&'a str>, +// ) -> RecorderResult<(Condition, Vec<&'a str>)> +// where +// E: Into + Clone, +// { +// let object = node.as_object().ok_or(SchemaError(format!( +// "Json filter input node must be an object" +// )))?; + +// let mut conditions = Condition::all(); + +// for (key, value) in object { +// if let Some(operation) = JsonFilterOperation::parse_str(key)? { +// match operation { +// JsonFilterOperation::And => { +// let mut condition = Condition::all(); +// let filters = value.as_array().ok_or(SchemaError(format!( +// "$and operation must be an array of sub filters" +// )))?; + +// for filter in filters { +// let result = +// recursive_prepare_json_node_condition(expr, filter, path)?; +// condition = condition.add(result.0); path = result.1; +// } + +// conditions = conditions.add(condition); +// } +// JsonFilterOperation::Between => { +// let mut condition = Condition::any(); +// let values = value +// .as_array() +// .and_then(|arr| if arr.len() == 2 { Some(arr) } else +// { None }) .ok_or(SchemaError(format!( +// "$between operation must be an array of two +// values" )))?; + +// let (lhs, rhs) = (values[0], values[1]); +// let (lcondition, lpath) = +// recursive_prepare_json_node_condition(expr, lhs, +// path)?; condition = condition.add(lcondition); +// let (rcondition, rpath) = +// recursive_prepare_json_node_condition(expr, rhs, +// lpath)?; condition = condition.add(rcondition); +// path = rpath; +// conditions = conditions.add(condition); +// } +// op => conditions.add(prepare_json_leaf_condition(expr, op, +// value, &path)?), } +// } else { +// path.push(key as &'a str); +// let result = recursive_prepare_json_node_condition(expr, node, +// path)?; conditions = conditions.add(result.0); +// path = result.1; +// path.pop(); +// } +// } + +// Ok((conditions, path)) +// } + +#[cfg(test)] +mod tests { + use sea_orm::{ + DeriveIden, + sea_query::{PostgresQueryBuilder, Query, Value, Values}, + }; + + use super::*; + + #[derive(DeriveIden)] + enum TestTable { + Table, + Job, + } + + fn build_test_query_sql(where_expr: SimpleExpr) -> (String, Vec) { + let (sql, Values(values)) = Query::select() + .column(TestTable::Job) + .and_where(where_expr) + .from(TestTable::Table) + .build(PostgresQueryBuilder); + (sql, values) + } + + #[test] + fn test_build_json_path_exists_expr() { + let (sql, params) = build_test_query_sql(build_json_path_exists_expr( + Expr::col((TestTable::Table, TestTable::Job)), + &["a", "b", "c"], + )); + dbg!(¶ms); + assert_eq!( + sql, + "SELECT \"job\" FROM \"test_table\" WHERE JSON_EXISTS(\"test_table\".\"job\", $1)" + ); + let expected_params = vec![Value::String(Some(Box::new("$.a.b.c".into())))]; + assert_eq!(params, expected_params); + } + + #[test] + fn test_build_json_path_query_expr() -> RecorderResult<()> { + let (sql, params) = build_test_query_sql(build_json_value_is_in_expr( + Expr::col((TestTable::Table, TestTable::Job)), + &["a", "b", "c"], + vec![ + serde_json::json!(1), + serde_json::json!("str"), + serde_json::json!(true), + ], + )?); + + dbg!(¶ms); + assert_eq!( + sql, + "SELECT \"job\" FROM \"test_table\" WHERE jsonb_path_query(\"test_table\".\"job\", \ + $1) = ANY(ARRAY[$3::jsonb,$4::jsonb,$5::jsonb]::jsonb[])" + ); + + Ok(()) + } +} diff --git a/apps/recorder/src/graphql/infra/mod.rs b/apps/recorder/src/graphql/infra/mod.rs index c7a4aa3..4713341 100644 --- a/apps/recorder/src/graphql/infra/mod.rs +++ b/apps/recorder/src/graphql/infra/mod.rs @@ -1,4 +1,6 @@ pub mod filter; pub mod guard; +pub mod pagination; pub mod transformer; pub mod util; +pub mod order; diff --git a/apps/recorder/src/graphql/infra/order.rs b/apps/recorder/src/graphql/infra/order.rs new file mode 100644 index 0000000..e69de29 diff --git a/apps/recorder/src/graphql/infra/pagination.rs b/apps/recorder/src/graphql/infra/pagination.rs new file mode 100644 index 0000000..c590f8e --- /dev/null +++ b/apps/recorder/src/graphql/infra/pagination.rs @@ -0,0 +1,36 @@ +use async_graphql::{InputObject, SimpleObject}; + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, InputObject)] +pub struct CursorInput { + pub cursor: Option, + pub limit: u64, +} + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, InputObject)] +pub struct PageInput { + pub page: u64, + pub limit: u64, +} + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, InputObject)] +pub struct OffsetInput { + pub offset: u64, + pub limit: u64, +} + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, InputObject)] +pub struct PaginationInput { + pub cursor: Option, + pub page: Option, + pub offset: Option, +} + +pub type PageInfo = async_graphql::connection::PageInfo; + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, SimpleObject)] +pub struct PaginationInfo { + pub pages: u64, + pub current: u64, + pub offset: u64, + pub total: u64, +} diff --git a/apps/recorder/src/graphql/schema_root.rs b/apps/recorder/src/graphql/schema_root.rs index c9d3314..86e3283 100644 --- a/apps/recorder/src/graphql/schema_root.rs +++ b/apps/recorder/src/graphql/schema_root.rs @@ -79,6 +79,13 @@ pub fn schema( let context = CONTEXT.get_or_init(|| { let mut context = BuilderContext::default(); + context.pagination_input.type_name = "SeaographyPaginationInput".to_string(); + context.pagination_info_object.type_name = "SeaographyPaginationInfo".to_string(); + context.cursor_input.type_name = "SeaographyCursorInput".to_string(); + context.offset_input.type_name = "SeaographyOffsetInput".to_string(); + context.page_input.type_name = "SeaographyPageInput".to_string(); + context.page_info_object.type_name = "SeaographyPageInfo".to_string(); + restrict_subscriber_for_entity::( &mut context, &bangumi::Column::SubscriberId, diff --git a/apps/recorder/src/graphql/views/mod.rs b/apps/recorder/src/graphql/views/mod.rs index 0ed6393..c25c777 100644 --- a/apps/recorder/src/graphql/views/mod.rs +++ b/apps/recorder/src/graphql/views/mod.rs @@ -1 +1,2 @@ mod subscription; +mod task; diff --git a/apps/recorder/src/graphql/views/subscription.rs b/apps/recorder/src/graphql/views/subscription.rs index 6bd6e20..5b543f4 100644 --- a/apps/recorder/src/graphql/views/subscription.rs +++ b/apps/recorder/src/graphql/views/subscription.rs @@ -1,12 +1,10 @@ use std::sync::Arc; use async_graphql::{Context, InputObject, Object, Result as GraphQLResult, SimpleObject}; -use sea_orm::{DbErr, EntityTrait}; use crate::{ app::AppContextTrait, auth::AuthUserInfo, - errors::RecorderError, models::subscriptions::{self, SubscriptionTrait}, task::SubscriberTaskPayload, }; @@ -25,7 +23,7 @@ struct SyncOneSubscriptionTaskOutput { #[Object] impl SubscriptionMutation { - async fn sync_one_subscription_feeds( + async fn sync_one_subscription_feeds_incremental( &self, ctx: &Context<'_>, input: SyncOneSubscriptionFilterInput, @@ -35,24 +33,12 @@ impl SubscriptionMutation { let app_ctx = ctx.data::>()?; let subscriber_id = auth_user_info.subscriber_auth.subscriber_id; - let subscription_model = subscriptions::Entity::find_by_id(input.subscription_id) - .one(app_ctx.db()) - .await? - .ok_or_else(|| RecorderError::DbError { - source: DbErr::RecordNotFound(format!( - "Subscription id = {} not found", - input.subscription_id - )), - })?; - - if subscription_model.subscriber_id != subscriber_id { - Err(RecorderError::DbError { - source: DbErr::RecordNotFound(format!( - "Subscription id = {} not found", - input.subscription_id - )), - })?; - } + let subscription_model = subscriptions::Model::find_by_id_and_subscriber_id( + app_ctx.as_ref(), + input.subscription_id, + subscriber_id, + ) + .await?; let subscription = subscriptions::Subscription::try_from_model(&subscription_model)?; @@ -61,7 +47,40 @@ impl SubscriptionMutation { let task_id = task_service .add_subscriber_task( auth_user_info.subscriber_auth.subscriber_id, - SubscriberTaskPayload::SyncOneSubscriptionFeeds(subscription.into()), + SubscriberTaskPayload::SyncOneSubscriptionFeedsIncremental(subscription.into()), + ) + .await?; + + Ok(SyncOneSubscriptionTaskOutput { + task_id: task_id.to_string(), + }) + } + + async fn sync_one_subscription_feeds_full( + &self, + ctx: &Context<'_>, + input: SyncOneSubscriptionFilterInput, + ) -> GraphQLResult { + let auth_user_info = ctx.data::()?; + + let app_ctx = ctx.data::>()?; + let subscriber_id = auth_user_info.subscriber_auth.subscriber_id; + + let subscription_model = subscriptions::Model::find_by_id_and_subscriber_id( + app_ctx.as_ref(), + input.subscription_id, + subscriber_id, + ) + .await?; + + let subscription = subscriptions::Subscription::try_from_model(&subscription_model)?; + + let task_service = app_ctx.task(); + + let task_id = task_service + .add_subscriber_task( + auth_user_info.subscriber_auth.subscriber_id, + SubscriberTaskPayload::SyncOneSubscriptionFeedsFull(subscription.into()), ) .await?; @@ -80,24 +99,12 @@ impl SubscriptionMutation { let app_ctx = ctx.data::>()?; let subscriber_id = auth_user_info.subscriber_auth.subscriber_id; - let subscription_model = subscriptions::Entity::find_by_id(input.subscription_id) - .one(app_ctx.db()) - .await? - .ok_or_else(|| RecorderError::DbError { - source: DbErr::RecordNotFound(format!( - "Subscription id = {} not found", - input.subscription_id - )), - })?; - - if subscription_model.subscriber_id != subscriber_id { - Err(RecorderError::DbError { - source: DbErr::RecordNotFound(format!( - "Subscription id = {} not found", - input.subscription_id - )), - })?; - } + let subscription_model = subscriptions::Model::find_by_id_and_subscriber_id( + app_ctx.as_ref(), + input.subscription_id, + subscriber_id, + ) + .await?; let subscription = subscriptions::Subscription::try_from_model(&subscription_model)?; diff --git a/apps/recorder/src/graphql/views/task.rs b/apps/recorder/src/graphql/views/task.rs new file mode 100644 index 0000000..98e05be --- /dev/null +++ b/apps/recorder/src/graphql/views/task.rs @@ -0,0 +1,27 @@ +use std::sync::Arc; + +use async_graphql::{Context, InputObject, Object, Result as GraphQLResult}; + +use crate::{app::AppContextTrait, auth::AuthUserInfo}; + +struct TaskQuery; + +#[derive(InputObject)] +struct SubscriberTasksFilterInput { + pub subscription_id: Option, + pub task_id: Option, + pub task_type: Option, +} + +#[Object] +impl TaskQuery { + async fn subscriber_tasks(&self, ctx: &Context<'_>) -> GraphQLResult> { + let auth_user_info = ctx.data::()?; + let app_ctx = ctx.data::>()?; + let subscriber_id = auth_user_info.subscriber_auth.subscriber_id; + + let task_service = app_ctx.task(); + + todo!() + } +} diff --git a/apps/recorder/src/migrations/defs.rs b/apps/recorder/src/migrations/defs.rs index 61650b3..2e62e54 100644 --- a/apps/recorder/src/migrations/defs.rs +++ b/apps/recorder/src/migrations/defs.rs @@ -32,7 +32,6 @@ pub enum Subscriptions { SubscriberId, Category, SourceUrl, - SourceUrls, Enabled, CredentialId, } diff --git a/apps/recorder/src/migrations/m20220101_000001_init.rs b/apps/recorder/src/migrations/m20220101_000001_init.rs index 3e3c3df..4f77117 100644 --- a/apps/recorder/src/migrations/m20220101_000001_init.rs +++ b/apps/recorder/src/migrations/m20220101_000001_init.rs @@ -64,10 +64,6 @@ impl MigrationTrait for Migration { .col(string(Subscriptions::DisplayName)) .col(integer(Subscriptions::SubscriberId)) .col(text(Subscriptions::SourceUrl)) - .col(array_null( - Subscriptions::SourceUrls, - ColumnType::String(StringLen::None), - )) .col(boolean(Subscriptions::Enabled)) .col(enumeration( Subscriptions::Category, diff --git a/apps/recorder/src/models/bangumi.rs b/apps/recorder/src/models/bangumi.rs index 1c82e0c..2563950 100644 --- a/apps/recorder/src/models/bangumi.rs +++ b/apps/recorder/src/models/bangumi.rs @@ -315,4 +315,24 @@ impl Model { ) })) } + + pub async fn get_subsribed_bangumi_list_from_subscription( + ctx: &dyn AppContextTrait, + subscription_id: i32, + ) -> RecorderResult> { + let db = ctx.db(); + let bangumi_list = Entity::find() + .filter( + Condition::all() + .add(subscription_bangumi::Column::SubscriptionId.eq(subscription_id)), + ) + .join_rev( + JoinType::InnerJoin, + subscription_bangumi::Relation::Bangumi.def(), + ) + .all(db) + .await?; + + Ok(bangumi_list) + } } diff --git a/apps/recorder/src/models/mod.rs b/apps/recorder/src/models/mod.rs index 4fd1ce7..3ef7b58 100644 --- a/apps/recorder/src/models/mod.rs +++ b/apps/recorder/src/models/mod.rs @@ -5,6 +5,7 @@ pub mod downloaders; pub mod downloads; pub mod episodes; pub mod query; +pub mod subscriber_tasks; pub mod subscribers; pub mod subscription_bangumi; pub mod subscription_episode; diff --git a/apps/recorder/src/models/subscriber_tasks.rs b/apps/recorder/src/models/subscriber_tasks.rs new file mode 100644 index 0000000..c2cae68 --- /dev/null +++ b/apps/recorder/src/models/subscriber_tasks.rs @@ -0,0 +1,18 @@ +use sea_orm::entity::prelude::*; + +use crate::task::SubscriberTask; + +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] +#[sea_orm(table_name = "subscriber_tasks")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i32, + pub subscriber_id: i32, + pub job: SubscriberTask, + pub state: String, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/apps/recorder/src/models/subscriptions.rs b/apps/recorder/src/models/subscriptions.rs index a462f5a..ae532f2 100644 --- a/apps/recorder/src/models/subscriptions.rs +++ b/apps/recorder/src/models/subscriptions.rs @@ -45,7 +45,6 @@ pub struct Model { pub subscriber_id: i32, pub category: SubscriptionCategory, pub source_url: String, - pub source_urls: Option>, pub enabled: bool, pub credential_id: Option, } @@ -176,14 +175,32 @@ impl Model { Ok(()) } - pub async fn sync_feeds(&self, ctx: Arc) -> RecorderResult<()> { - let subscription = self.try_into()?; - match subscription { - Subscription::MikanSubscriber(subscription) => subscription.sync_feeds(ctx).await, - Subscription::MikanSeason(subscription) => subscription.sync_feeds(ctx).await, - Subscription::MikanBangumi(subscription) => subscription.sync_feeds(ctx).await, - Subscription::Manual => Ok(()), + pub async fn find_by_id_and_subscriber_id( + ctx: &dyn AppContextTrait, + subscriber_id: i32, + subscription_id: i32, + ) -> RecorderResult { + let db = ctx.db(); + let subscription_model = Entity::find_by_id(subscription_id) + .one(db) + .await? + .ok_or_else(|| RecorderError::DbError { + source: DbErr::RecordNotFound(format!( + "Subscription id {subscription_id} not found or not belong to subscriber \ + {subscriber_id}", + )), + })?; + + if subscription_model.subscriber_id != subscriber_id { + Err(RecorderError::DbError { + source: DbErr::RecordNotFound(format!( + "Subscription id {subscription_id} not found or not belong to subscriber \ + {subscriber_id}", + )), + })?; } + + Ok(subscription_model) } } @@ -193,7 +210,9 @@ pub trait SubscriptionTrait: Sized + Debug { fn get_subscription_id(&self) -> i32; - async fn sync_feeds(&self, ctx: Arc) -> RecorderResult<()>; + async fn sync_feeds_incremental(&self, ctx: Arc) -> RecorderResult<()>; + + async fn sync_feeds_full(&self, ctx: Arc) -> RecorderResult<()>; async fn sync_sources(&self, ctx: Arc) -> RecorderResult<()>; @@ -244,11 +263,20 @@ impl SubscriptionTrait for Subscription { } } - async fn sync_feeds(&self, ctx: Arc) -> RecorderResult<()> { + async fn sync_feeds_incremental(&self, ctx: Arc) -> RecorderResult<()> { match self { - Self::MikanSubscriber(subscription) => subscription.sync_feeds(ctx).await, - Self::MikanSeason(subscription) => subscription.sync_feeds(ctx).await, - Self::MikanBangumi(subscription) => subscription.sync_feeds(ctx).await, + Self::MikanSubscriber(subscription) => subscription.sync_feeds_incremental(ctx).await, + Self::MikanSeason(subscription) => subscription.sync_feeds_incremental(ctx).await, + Self::MikanBangumi(subscription) => subscription.sync_feeds_incremental(ctx).await, + Self::Manual => Ok(()), + } + } + + async fn sync_feeds_full(&self, ctx: Arc) -> RecorderResult<()> { + match self { + Self::MikanSubscriber(subscription) => subscription.sync_feeds_full(ctx).await, + Self::MikanSeason(subscription) => subscription.sync_feeds_full(ctx).await, + Self::MikanBangumi(subscription) => subscription.sync_feeds_full(ctx).await, Self::Manual => Ok(()), } } diff --git a/apps/recorder/src/task/core.rs b/apps/recorder/src/task/core.rs index 4a8bec0..273bcc7 100644 --- a/apps/recorder/src/task/core.rs +++ b/apps/recorder/src/task/core.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use apalis::prelude::State; use futures::Stream; use serde::{Serialize, de::DeserializeOwned}; diff --git a/apps/recorder/src/task/mod.rs b/apps/recorder/src/task/mod.rs index 82e174d..ddaf22c 100644 --- a/apps/recorder/src/task/mod.rs +++ b/apps/recorder/src/task/mod.rs @@ -7,7 +7,7 @@ pub use core::{SUBSCRIBER_TASK_APALIS_NAME, SubscriberAsyncTaskTrait, Subscriber pub use config::TaskConfig; pub use registry::{ - SubscriberTask, SubscriberTaskPayload, SyncOneSubscriptionFeedsTask, + SubscriberTask, SubscriberTaskPayload, SyncOneSubscriptionFeedsIncrementalTask, SyncOneSubscriptionSourcesTask, }; pub use service::TaskService; diff --git a/apps/recorder/src/task/registry/mod.rs b/apps/recorder/src/task/registry/mod.rs index 7b1aa72..e209c84 100644 --- a/apps/recorder/src/task/registry/mod.rs +++ b/apps/recorder/src/task/registry/mod.rs @@ -1,8 +1,12 @@ mod subscription; use std::sync::Arc; +use sea_orm::FromJsonQueryResult; use serde::{Deserialize, Serialize}; -pub use subscription::{SyncOneSubscriptionFeedsTask, SyncOneSubscriptionSourcesTask}; +pub use subscription::{ + SyncOneSubscriptionFeedsFullTask, SyncOneSubscriptionFeedsIncrementalTask, + SyncOneSubscriptionSourcesTask, +}; use super::SubscriberAsyncTaskTrait; use crate::{ @@ -10,11 +14,26 @@ use crate::{ errors::{RecorderError, RecorderResult}, }; +#[derive(async_graphql::Enum, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Copy)] +pub enum SubscriberTaskType { + #[serde(rename = "sync_one_subscription_feeds_incremental")] + #[graphql(name = "sync_one_subscription_feeds_incremental")] + SyncOneSubscriptionFeedsIncremental, + #[serde(rename = "sync_one_subscription_feeds_full")] + #[graphql(name = "sync_one_subscription_feeds_full")] + SyncOneSubscriptionFeedsFull, + #[serde(rename = "sync_one_subscription_sources")] + #[graphql(name = "sync_one_subscription_sources")] + SyncOneSubscriptionSources, +} + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(tag = "task_type")] pub enum SubscriberTaskPayload { - #[serde(rename = "sync_one_subscription_feeds")] - SyncOneSubscriptionFeeds(SyncOneSubscriptionFeedsTask), + #[serde(rename = "sync_one_subscription_feeds_incremental")] + SyncOneSubscriptionFeedsIncremental(SyncOneSubscriptionFeedsIncrementalTask), + #[serde(rename = "sync_one_subscription_feeds_full")] + SyncOneSubscriptionFeedsFull(SyncOneSubscriptionFeedsFullTask), #[serde(rename = "sync_one_subscription_sources")] SyncOneSubscriptionSources(SyncOneSubscriptionSourcesTask), } @@ -22,10 +41,23 @@ pub enum SubscriberTaskPayload { impl SubscriberTaskPayload { pub async fn run(self, ctx: Arc) -> RecorderResult<()> { match self { - Self::SyncOneSubscriptionFeeds(task) => task.run(ctx).await, + Self::SyncOneSubscriptionFeedsIncremental(task) => task.run(ctx).await, + Self::SyncOneSubscriptionFeedsFull(task) => task.run(ctx).await, Self::SyncOneSubscriptionSources(task) => task.run(ctx).await, } } + + pub fn task_type(&self) -> SubscriberTaskType { + match self { + Self::SyncOneSubscriptionFeedsIncremental(_) => { + SubscriberTaskType::SyncOneSubscriptionFeedsIncremental + } + Self::SyncOneSubscriptionFeedsFull(_) => { + SubscriberTaskType::SyncOneSubscriptionFeedsFull + } + Self::SyncOneSubscriptionSources(_) => SubscriberTaskType::SyncOneSubscriptionSources, + } + } } impl TryFrom<&SubscriberTaskPayload> for serde_json::Value { @@ -45,7 +77,7 @@ impl TryFrom<&SubscriberTaskPayload> for serde_json::Value { } } -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, FromJsonQueryResult)] pub struct SubscriberTask { pub subscriber_id: i32, #[serde(flatten)] diff --git a/apps/recorder/src/task/registry/subscription.rs b/apps/recorder/src/task/registry/subscription.rs index 7e77f17..28b1235 100644 --- a/apps/recorder/src/task/registry/subscription.rs +++ b/apps/recorder/src/task/registry/subscription.rs @@ -11,18 +11,35 @@ use crate::{ }; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct SyncOneSubscriptionFeedsTask(pub subscriptions::Subscription); +pub struct SyncOneSubscriptionFeedsIncrementalTask(pub subscriptions::Subscription); -impl From for SyncOneSubscriptionFeedsTask { +impl From for SyncOneSubscriptionFeedsIncrementalTask { fn from(subscription: subscriptions::Subscription) -> Self { Self(subscription) } } #[async_trait::async_trait] -impl SubscriberAsyncTaskTrait for SyncOneSubscriptionFeedsTask { +impl SubscriberAsyncTaskTrait for SyncOneSubscriptionFeedsIncrementalTask { async fn run_async(self, ctx: Arc) -> RecorderResult<()> { - self.0.sync_feeds(ctx).await?; + self.0.sync_feeds_incremental(ctx).await?; + Ok(()) + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct SyncOneSubscriptionFeedsFullTask(pub subscriptions::Subscription); + +impl From for SyncOneSubscriptionFeedsFullTask { + fn from(subscription: subscriptions::Subscription) -> Self { + Self(subscription) + } +} + +#[async_trait::async_trait] +impl SubscriberAsyncTaskTrait for SyncOneSubscriptionFeedsFullTask { + async fn run_async(self, ctx: Arc) -> RecorderResult<()> { + self.0.sync_feeds_full(ctx).await?; Ok(()) } } diff --git a/apps/recorder/src/task/service.rs b/apps/recorder/src/task/service.rs index 7433adf..2ce00b7 100644 --- a/apps/recorder/src/task/service.rs +++ b/apps/recorder/src/task/service.rs @@ -13,7 +13,7 @@ use crate::{ pub struct TaskService { pub config: TaskConfig, ctx: Arc, - subscriber_task_storage: Arc>>, + pub subscriber_task_storage: Arc>>, } impl TaskService {