refactor: continue

This commit is contained in:
master 2025-05-14 02:01:59 +08:00
parent bf270e4e87
commit 8600bf216a
21 changed files with 842 additions and 157 deletions

View File

@ -6,18 +6,16 @@ use std::{
use async_graphql::{InputObject, SimpleObject};
use fetch::fetch_bytes;
use futures::try_join;
use itertools::Itertools;
use futures::{Stream, TryStreamExt, pin_mut, try_join};
use maplit::hashmap;
use sea_orm::{
ActiveValue::Set, ColumnTrait, Condition, EntityTrait, JoinType, QueryFilter, QuerySelect,
RelationTrait,
ColumnTrait, Condition, EntityTrait, JoinType, QueryFilter, QuerySelect, RelationTrait,
};
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use snafu::{OptionExt, ResultExt};
use url::Url;
use super::scrape_mikan_bangumi_meta_list_from_season_flow_url;
use super::scrape_mikan_bangumi_meta_stream_from_season_flow_url;
use crate::{
app::AppContextTrait,
errors::{RecorderError, RecorderResult},
@ -158,8 +156,8 @@ impl SubscriptionTrait for MikanSubscriberSubscription {
self.id
}
async fn sync_feeds(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
let rss_item_list = self.get_rss_item_list(ctx.as_ref()).await?;
async fn sync_feeds_incremental(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
let rss_item_list = self.get_rss_item_list_from_source_url(ctx.as_ref()).await?;
sync_mikan_feeds_from_rss_item_list(
ctx.as_ref(),
@ -172,6 +170,22 @@ impl SubscriptionTrait for MikanSubscriberSubscription {
Ok(())
}
async fn sync_feeds_full(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
self.sync_feeds_incremental(ctx.clone()).await?;
let rss_item_list = self
.get_rss_item_list_from_subsribed_url_rss_link(ctx.as_ref())
.await?;
sync_mikan_feeds_from_rss_item_list(
ctx.as_ref(),
rss_item_list,
self.get_subscriber_id(),
self.get_subscription_id(),
)
.await
}
async fn sync_sources(&self, _ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
Ok(())
}
@ -198,7 +212,7 @@ impl SubscriptionTrait for MikanSubscriberSubscription {
impl MikanSubscriberSubscription {
#[tracing::instrument(err, skip(ctx))]
async fn get_rss_item_list(
async fn get_rss_item_list_from_source_url(
&self,
ctx: &dyn AppContextTrait,
) -> RecorderResult<Vec<MikanRssItem>> {
@ -213,13 +227,47 @@ impl MikanSubscriberSubscription {
let mut result = vec![];
for (idx, item) in channel.items.into_iter().enumerate() {
let item = MikanRssItem::try_from(item).inspect_err(
|error| tracing::warn!(error = %error, "failed to extract rss item idx = {}", idx),
)?;
let item = MikanRssItem::try_from(item)
.with_whatever_context::<_, String, RecorderError>(|_| {
format!("failed to extract rss item at idx {idx}")
})?;
result.push(item);
}
Ok(result)
}
#[tracing::instrument(err, skip(ctx))]
async fn get_rss_item_list_from_subsribed_url_rss_link(
&self,
ctx: &dyn AppContextTrait,
) -> RecorderResult<Vec<MikanRssItem>> {
let subscribed_bangumi_list =
bangumi::Model::get_subsribed_bangumi_list_from_subscription(ctx, self.id).await?;
let mut rss_item_list = vec![];
for subscribed_bangumi in subscribed_bangumi_list {
let rss_url = subscribed_bangumi
.rss_link
.with_whatever_context::<_, String, RecorderError>(|| {
format!(
"rss link is required, subscription_id = {:?}, bangumi_name = {}",
self.id, subscribed_bangumi.display_name
)
})?;
let bytes = fetch_bytes(ctx.mikan(), rss_url).await?;
let channel = rss::Channel::read_from(&bytes[..])?;
for (idx, item) in channel.items.into_iter().enumerate() {
let item = MikanRssItem::try_from(item)
.with_whatever_context::<_, String, RecorderError>(|_| {
format!("failed to extract rss item at idx {idx}")
})?;
rss_item_list.push(item);
}
}
Ok(rss_item_list)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, InputObject, SimpleObject)]
@ -241,8 +289,10 @@ impl SubscriptionTrait for MikanSeasonSubscription {
self.id
}
async fn sync_feeds(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
let rss_item_list = self.get_rss_item_list(ctx.as_ref()).await?;
async fn sync_feeds_incremental(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
let rss_item_list = self
.get_rss_item_list_from_subsribed_url_rss_link(ctx.as_ref())
.await?;
sync_mikan_feeds_from_rss_item_list(
ctx.as_ref(),
@ -255,31 +305,36 @@ impl SubscriptionTrait for MikanSeasonSubscription {
Ok(())
}
async fn sync_feeds_full(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
self.sync_sources(ctx.clone()).await?;
self.sync_feeds_incremental(ctx).await
}
async fn sync_sources(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
let bangumi_meta_list = self.get_bangumi_meta_list(ctx.clone()).await?;
let bangumi_meta_list = self.get_bangumi_meta_stream_from_source_url(ctx.clone());
let mikan_base_url = ctx.mikan().base_url();
pin_mut!(bangumi_meta_list);
let rss_link_list = bangumi_meta_list
.into_iter()
.map(|bangumi_meta| {
build_mikan_bangumi_subscription_rss_url(
mikan_base_url.clone(),
&bangumi_meta.mikan_bangumi_id,
Some(&bangumi_meta.mikan_fansub_id),
)
.to_string()
})
.collect_vec();
subscriptions::Entity::update_many()
.set(subscriptions::ActiveModel {
source_urls: Set(Some(rss_link_list)),
..Default::default()
})
.filter(subscription_bangumi::Column::SubscriptionId.eq(self.id))
.exec(ctx.db())
while let Some(bangumi_meta) = bangumi_meta_list.try_next().await? {
let bangumi_hash = bangumi_meta.bangumi_hash();
bangumi::Model::get_or_insert_from_mikan(
ctx.as_ref(),
bangumi_hash,
self.get_subscriber_id(),
self.get_subscription_id(),
async || {
let bangumi_am = bangumi::ActiveModel::from_mikan_bangumi_meta(
ctx.as_ref(),
bangumi_meta,
self.get_subscriber_id(),
self.get_subscription_id(),
)
.await?;
Ok(bangumi_am)
},
)
.await?;
}
Ok(())
}
@ -290,8 +345,8 @@ impl SubscriptionTrait for MikanSeasonSubscription {
let source_url_meta = MikanSeasonFlowUrlMeta::from_url(&source_url)
.with_whatever_context::<_, String, RecorderError>(|| {
format!(
"MikanSeasonSubscription should extract season_str and year from source_url, \
source_url = {}, subscription_id = {}",
"season_str and year is required when extracting MikanSeasonSubscription from \
source_url, source_url = {}, subscription_id = {}",
source_url, model.id
)
})?;
@ -300,7 +355,8 @@ impl SubscriptionTrait for MikanSeasonSubscription {
.credential_id
.with_whatever_context::<_, String, RecorderError>(|| {
format!(
"MikanSeasonSubscription credential_id is required, subscription_id = {}",
"credential_id is required when extracting MikanSeasonSubscription, \
subscription_id = {}",
model.id
)
})?;
@ -316,11 +372,10 @@ impl SubscriptionTrait for MikanSeasonSubscription {
}
impl MikanSeasonSubscription {
#[tracing::instrument(err, skip(ctx))]
async fn get_bangumi_meta_list(
pub fn get_bangumi_meta_stream_from_source_url(
&self,
ctx: Arc<dyn AppContextTrait>,
) -> RecorderResult<Vec<MikanBangumiMeta>> {
) -> impl Stream<Item = RecorderResult<MikanBangumiMeta>> {
let credential_id = self.credential_id;
let year = self.year;
let season_str = self.season_str;
@ -328,16 +383,15 @@ impl MikanSeasonSubscription {
let mikan_base_url = ctx.mikan().base_url().clone();
let mikan_season_flow_url = build_mikan_season_flow_url(mikan_base_url, year, season_str);
scrape_mikan_bangumi_meta_list_from_season_flow_url(
scrape_mikan_bangumi_meta_stream_from_season_flow_url(
ctx,
mikan_season_flow_url,
credential_id,
)
.await
}
#[tracing::instrument(err, skip(ctx))]
async fn get_rss_item_list(
async fn get_rss_item_list_from_subsribed_url_rss_link(
&self,
ctx: &dyn AppContextTrait,
) -> RecorderResult<Vec<MikanRssItem>> {
@ -358,8 +412,8 @@ impl MikanSeasonSubscription {
.rss_link
.with_whatever_context::<_, String, RecorderError>(|| {
format!(
"MikanSeasonSubscription rss_link is required, subscription_id = {}",
self.id
"rss_link is required, subscription_id = {}, bangumi_name = {}",
self.id, subscribed_bangumi.display_name
)
})?;
let bytes = fetch_bytes(ctx.mikan(), rss_url).await?;
@ -367,9 +421,10 @@ impl MikanSeasonSubscription {
let channel = rss::Channel::read_from(&bytes[..])?;
for (idx, item) in channel.items.into_iter().enumerate() {
let item = MikanRssItem::try_from(item).inspect_err(
|error| tracing::warn!(error = %error, "failed to extract rss item idx = {}", idx),
)?;
let item = MikanRssItem::try_from(item)
.with_whatever_context::<_, String, RecorderError>(|_| {
format!("failed to extract rss item at idx {idx}")
})?;
rss_item_list.push(item);
}
}
@ -395,20 +450,24 @@ impl SubscriptionTrait for MikanBangumiSubscription {
self.id
}
async fn sync_feeds(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
let rss_item_list = self.get_rss_item_list(ctx.as_ref()).await?;
async fn sync_feeds_incremental(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
let rss_item_list = self.get_rss_item_list_from_source_url(ctx.as_ref()).await?;
sync_mikan_feeds_from_rss_item_list(
ctx.as_ref(),
rss_item_list,
<Self as SubscriptionTrait>::get_subscriber_id(self),
<Self as SubscriptionTrait>::get_subscription_id(self),
self.get_subscriber_id(),
self.get_subscription_id(),
)
.await?;
Ok(())
}
async fn sync_feeds_full(&self, _ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
self.sync_feeds_incremental(_ctx).await
}
async fn sync_sources(&self, _ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
Ok(())
}
@ -419,8 +478,8 @@ impl SubscriptionTrait for MikanBangumiSubscription {
let meta = MikanBangumiHash::from_rss_url(&source_url)
.with_whatever_context::<_, String, RecorderError>(|| {
format!(
"MikanBangumiSubscription need to extract bangumi id and fansub id from \
source_url = {}, subscription_id = {}",
"bangumi_id and fansub_id is required when extracting \
MikanBangumiSubscription, source_url = {}, subscription_id = {}",
source_url, model.id
)
})?;
@ -436,7 +495,7 @@ impl SubscriptionTrait for MikanBangumiSubscription {
impl MikanBangumiSubscription {
#[tracing::instrument(err, skip(ctx))]
async fn get_rss_item_list(
async fn get_rss_item_list_from_source_url(
&self,
ctx: &dyn AppContextTrait,
) -> RecorderResult<Vec<MikanRssItem>> {
@ -452,9 +511,10 @@ impl MikanBangumiSubscription {
let mut result = vec![];
for (idx, item) in channel.items.into_iter().enumerate() {
let item = MikanRssItem::try_from(item).inspect_err(
|error| tracing::warn!(error = %error, "failed to extract rss item idx = {}", idx),
)?;
let item = MikanRssItem::try_from(item)
.with_whatever_context::<_, String, RecorderError>(|_| {
format!("failed to extract rss item at idx {idx}")
})?;
result.push(item);
}
Ok(result)

View File

@ -152,30 +152,12 @@ pub struct MikanBangumiMeta {
pub fansub: String,
}
#[async_graphql::Object]
impl MikanBangumiMeta {
async fn homepage(&self) -> &str {
self.homepage.as_str()
}
async fn origin_poster_src(&self) -> Option<&str> {
self.origin_poster_src.as_ref().map(|url| url.as_str())
}
async fn bangumi_title(&self) -> &str {
&self.bangumi_title
}
async fn mikan_bangumi_id(&self) -> &str {
&self.mikan_bangumi_id
}
async fn mikan_fansub_id(&self) -> &str {
&self.mikan_fansub_id
}
async fn fansub(&self) -> &str {
&self.fansub
pub fn bangumi_hash(&self) -> MikanBangumiHash {
MikanBangumiHash {
mikan_bangumi_id: self.mikan_bangumi_id.clone(),
mikan_fansub_id: self.mikan_fansub_id.clone(),
}
}
}

View File

@ -1,8 +1,21 @@
use async_graphql::dynamic::{ObjectAccessor, TypeRef};
use async_graphql::{
InputObject, InputValueResult, Scalar, ScalarType,
dynamic::{ObjectAccessor, SchemaError, TypeRef},
};
use itertools::Itertools;
use maplit::btreeset;
use once_cell::sync::OnceCell;
use sea_orm::{ColumnTrait, Condition, EntityTrait, Value};
use seaography::{BuilderContext, FilterInfo, FilterOperation, SeaResult};
use sea_orm::{
ColumnTrait, Condition, EntityTrait,
prelude::Expr,
sea_query::{self, IntoCondition, SimpleExpr, extension::postgres::PgExpr},
};
use seaography::{
BuilderContext, FilterInfo, FilterOperation as SeaographqlFilterOperation, SeaResult,
};
use serde_json::Value;
use crate::errors::{RecorderError, RecorderResult};
pub static SUBSCRIBER_ID_FILTER_INFO: OnceCell<FilterInfo> = OnceCell::new();
@ -10,7 +23,7 @@ pub fn init_custom_filter_info() {
SUBSCRIBER_ID_FILTER_INFO.get_or_init(|| FilterInfo {
type_name: String::from("SubscriberIdFilterInput"),
base_type: TypeRef::INT.into(),
supported_operations: btreeset! { FilterOperation::Equals },
supported_operations: btreeset! { SeaographqlFilterOperation::Equals },
});
}
@ -31,10 +44,10 @@ where
let operations = &subscriber_id_filter_info.supported_operations;
for operation in operations {
match operation {
FilterOperation::Equals => {
SeaographqlFilterOperation::Equals => {
if let Some(value) = filter.get("eq") {
let value: i32 = value.i64()?.try_into()?;
let value = Value::Int(Some(value));
let value = sea_orm::Value::Int(Some(value));
condition = condition.add(column.eq(value));
}
}
@ -44,3 +57,441 @@ where
Ok(condition)
})
}
#[derive(Clone, Debug, InputObject)]
pub struct StringFilterInput {
pub eq: Option<Value>,
pub ne: Option<Value>,
pub gt: Option<Value>,
pub gte: Option<Value>,
pub lt: Option<Value>,
pub lte: Option<Value>,
pub in_: Option<Vec<Value>>,
pub nin: Option<Vec<Value>>,
pub is_null: Option<bool>,
pub is_not_null: Option<bool>,
pub contains: Option<Value>,
pub starts_with: Option<Value>,
pub ends_with: Option<Value>,
pub like: Option<Value>,
pub not_like: Option<Value>,
pub between: Option<Value>,
pub not_between: Option<Value>,
}
#[derive(Clone, Debug, InputObject)]
pub struct TextFilterInput {
pub eq: Option<Value>,
pub ne: Option<Value>,
pub gt: Option<Value>,
pub gte: Option<Value>,
pub lt: Option<Value>,
pub lte: Option<Value>,
pub in_: Option<Vec<Value>>,
pub nin: Option<Vec<Value>>,
pub is_null: Option<bool>,
pub between: Option<Value>,
pub not_between: Option<Value>,
}
#[derive(Clone, Debug, InputObject)]
pub struct IntFilterInput {
pub eq: Option<Value>,
pub ne: Option<Value>,
pub gt: Option<Value>,
pub gte: Option<Value>,
pub lt: Option<Value>,
pub lte: Option<Value>,
pub in_: Option<Vec<Value>>,
pub nin: Option<Vec<Value>>,
pub is_null: Option<bool>,
pub is_not_null: Option<bool>,
pub between: Option<Value>,
pub not_between: Option<Value>,
}
#[derive(Clone, Debug, InputObject)]
pub struct FloatFilterInput {
pub eq: Option<Value>,
pub ne: Option<Value>,
pub gt: Option<Value>,
pub gte: Option<Value>,
pub lt: Option<Value>,
pub lte: Option<Value>,
pub in_: Option<Vec<Value>>,
pub nin: Option<Vec<Value>>,
pub is_null: Option<bool>,
pub is_not_null: Option<bool>,
pub between: Option<Value>,
pub not_between: Option<Value>,
}
#[derive(Clone, Debug, InputObject)]
pub struct BooleanFilterInput {
pub eq: Option<Value>,
pub ne: Option<Value>,
pub gt: Option<Value>,
pub gte: Option<Value>,
pub lt: Option<Value>,
pub lte: Option<Value>,
pub in_: Option<Vec<Value>>,
pub nin: Option<Vec<Value>>,
pub is_null: Option<bool>,
pub is_not_null: Option<bool>,
}
#[derive(Clone, Debug, InputObject)]
pub struct JsonArrayFilterInput {
pub is_null: Option<bool>,
pub is_not_null: Option<bool>,
pub contains: Option<Value>,
}
#[derive(Clone, Debug)]
pub struct JsonFilterInput(pub serde_json::Value);
#[Scalar(name = "JsonFilterInput")]
impl ScalarType for JsonFilterInput {
fn parse(value: async_graphql::Value) -> InputValueResult<Self> {
Ok(JsonFilterInput(value.into_json()?))
}
fn to_value(&self) -> async_graphql::Value {
async_graphql::Value::from_json(self.0.clone()).unwrap()
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Copy)]
pub enum JsonFilterOperation {
Equals,
NotEquals,
GreaterThan,
GreaterThanEquals,
LessThan,
LessThanEquals,
IsIn,
IsNotIn,
IsNull,
IsNotNull,
Contains,
StartsWith,
EndsWith,
Like,
NotLike,
Exists,
NotExists,
Between,
NotBetween,
And,
Or,
}
impl JsonFilterOperation {
pub fn is_filter_operation(property_key: &str) -> bool {
property_key.starts_with("$")
}
pub fn parse_str(value: &str) -> Result<Option<Self>, async_graphql::dynamic::SchemaError> {
match value {
"$eq" => Ok(Some(JsonFilterOperation::Equals)),
"$ne" => Ok(Some(JsonFilterOperation::NotEquals)),
"$gt" => Ok(Some(JsonFilterOperation::GreaterThan)),
"$gte" => Ok(Some(JsonFilterOperation::GreaterThanEquals)),
"$lt" => Ok(Some(JsonFilterOperation::LessThan)),
"$lte" => Ok(Some(JsonFilterOperation::LessThanEquals)),
"$is_in" => Ok(Some(JsonFilterOperation::IsIn)),
"$is_not_in" => Ok(Some(JsonFilterOperation::IsNotIn)),
"$is_null" => Ok(Some(JsonFilterOperation::IsNull)),
"$is_not_null" => Ok(Some(JsonFilterOperation::IsNotNull)),
"$contains" => Ok(Some(JsonFilterOperation::Contains)),
"$starts_with" => Ok(Some(JsonFilterOperation::StartsWith)),
"$ends_with" => Ok(Some(JsonFilterOperation::EndsWith)),
"$like" => Ok(Some(JsonFilterOperation::Like)),
"$not_like" => Ok(Some(JsonFilterOperation::NotLike)),
"$between" => Ok(Some(JsonFilterOperation::Between)),
"$not_between" => Ok(Some(JsonFilterOperation::NotBetween)),
"$and" => Ok(Some(JsonFilterOperation::And)),
"$or" => Ok(Some(JsonFilterOperation::Or)),
"$exists" => Ok(Some(JsonFilterOperation::Exists)),
"$not_exists" => Ok(Some(JsonFilterOperation::NotExists)),
s if Self::is_filter_operation(s) => Err(async_graphql::dynamic::SchemaError(format!(
"Use reserved but not implemented filter operation: {value}"
))),
_ => Ok(None),
}
}
}
impl AsRef<str> for JsonFilterOperation {
fn as_ref(&self) -> &str {
match self {
JsonFilterOperation::Equals => "$eq",
JsonFilterOperation::NotEquals => "$ne",
JsonFilterOperation::GreaterThan => "$gt",
JsonFilterOperation::GreaterThanEquals => "$gte",
JsonFilterOperation::LessThan => "$lt",
JsonFilterOperation::LessThanEquals => "$lte",
JsonFilterOperation::IsIn => "$is_in",
JsonFilterOperation::IsNotIn => "$is_not_in",
JsonFilterOperation::IsNull => "$is_null",
JsonFilterOperation::IsNotNull => "$is_not_null",
JsonFilterOperation::Contains => "$contains",
JsonFilterOperation::StartsWith => "$starts_with",
JsonFilterOperation::EndsWith => "$ends_with",
JsonFilterOperation::Like => "$like",
JsonFilterOperation::NotLike => "$not_like",
JsonFilterOperation::Between => "$between",
JsonFilterOperation::NotBetween => "$not_between",
JsonFilterOperation::And => "$and",
JsonFilterOperation::Or => "$or",
JsonFilterOperation::Exists => "$exists",
JsonFilterOperation::NotExists => "$not_exists",
}
}
}
fn build_json_leaf_get_expr(
expr: impl Into<SimpleExpr>,
path: &[&str],
) -> RecorderResult<SimpleExpr> {
if path.is_empty() {
Err(async_graphql::dynamic::SchemaError(
"JsonFilterInput path must be at least one level deep".to_string(),
))?
}
let mut expr = expr.into();
for key in path {
expr = expr.get_json_field(*key);
}
Ok(expr)
}
fn build_json_leaf_cast_expr(
expr: impl Into<SimpleExpr>,
path: &[&str],
) -> RecorderResult<SimpleExpr> {
if path.is_empty() {
Err(async_graphql::dynamic::SchemaError(
"JsonFilterInput path must be at least one level deep".to_string(),
))?
}
let mut expr = expr.into();
for key in path.iter().take(path.len() - 1) {
expr = expr.get_json_field(*key);
}
expr = expr.cast_json_field(path[path.len() - 1]);
Ok(expr)
}
fn build_json_path_expr(path: &[&str]) -> SimpleExpr {
Expr::val(format!("$.{}", path.join("."))).into()
}
fn build_json_path_exists_expr(col_expr: impl Into<SimpleExpr>, path: &[&str]) -> SimpleExpr {
Expr::cust_with_exprs(
"JSON_EXISTS($1, $2)",
[col_expr.into(), build_json_path_expr(path)],
)
}
fn build_json_path_query_expr(col: impl Into<SimpleExpr>, path: &[&str]) -> SimpleExpr {
Expr::cust_with_exprs("".to_string(), [col.into(), build_json_path_expr(path)])
}
fn build_json_value_is_in_expr(
col_expr: impl Into<SimpleExpr>,
path: &[&str],
values: Vec<Value>,
) -> RecorderResult<SimpleExpr> {
let template = format!(
"jsonb_path_query($1, $2) = ANY(ARRAY[{}]::jsonb[])",
(0..values.len())
.map(|i| format!("${}::jsonb", i + 3))
.join(",")
);
let values = values
.into_iter()
.map(|v| serde_json::to_string(&v))
.collect::<Result<Vec<_>, _>>()?;
let mut exprs = vec![col_expr.into(), build_json_path_expr(path)];
exprs.extend(values.into_iter().map(|v| Expr::val(v).into()));
dbg!(&exprs);
Ok(Expr::cust_with_exprs(template, exprs))
}
fn prepare_json_leaf_condition(
col_expr: impl Into<SimpleExpr>,
op: JsonFilterOperation,
value: Value,
path: &[&str],
) -> RecorderResult<Condition> {
Ok(match (op, value) {
(
op @ (JsonFilterOperation::Exists | JsonFilterOperation::NotExists),
Value::Bool(exists),
) => {
let json_exists_expr = build_json_path_exists_expr(col_expr, path);
if (op == JsonFilterOperation::Exists && exists)
|| (op == JsonFilterOperation::NotExists && !exists)
{
json_exists_expr.into_condition()
} else {
json_exists_expr.not().into_condition()
}
}
(JsonFilterOperation::Exists | JsonFilterOperation::NotExists, _) => {
Err(SchemaError(format!(
"JsonFilterInput leaf can not be $exists or $not_exists with a non-boolean value"
)))?
}
(JsonFilterOperation::And | JsonFilterOperation::Or, _) => {
unreachable!("JsonFilterInput leaf can not be $and or $or with any value")
}
(JsonFilterOperation::Equals, value) => {
let expr = build_json_leaf_get_expr(col_expr, path)?;
expr.eq(value).into_condition()
}
(JsonFilterOperation::NotEquals, value) => {
let expr = build_json_leaf_get_expr(col_expr, path)?;
expr.ne(value).into_condition()
}
(
JsonFilterOperation::GreaterThan
| JsonFilterOperation::GreaterThanEquals
| JsonFilterOperation::LessThan
| JsonFilterOperation::LessThanEquals,
Value::Array(_),
) => Err(SchemaError(format!(
"JsonFilterInput leaf can not be {} with an array",
op.as_ref()
)))?,
(_, _) => todo!(),
})
}
// fn recursive_prepare_json_node_condition<'a, E>(
// expr: &'a E,
// node: Value,
// mut path: Vec<&'a str>,
// ) -> RecorderResult<(Condition, Vec<&'a str>)>
// where
// E: Into<SimpleExpr> + Clone,
// {
// let object = node.as_object().ok_or(SchemaError(format!(
// "Json filter input node must be an object"
// )))?;
// let mut conditions = Condition::all();
// for (key, value) in object {
// if let Some(operation) = JsonFilterOperation::parse_str(key)? {
// match operation {
// JsonFilterOperation::And => {
// let mut condition = Condition::all();
// let filters = value.as_array().ok_or(SchemaError(format!(
// "$and operation must be an array of sub filters"
// )))?;
// for filter in filters {
// let result =
// recursive_prepare_json_node_condition(expr, filter, path)?;
// condition = condition.add(result.0); path = result.1;
// }
// conditions = conditions.add(condition);
// }
// JsonFilterOperation::Between => {
// let mut condition = Condition::any();
// let values = value
// .as_array()
// .and_then(|arr| if arr.len() == 2 { Some(arr) } else
// { None }) .ok_or(SchemaError(format!(
// "$between operation must be an array of two
// values" )))?;
// let (lhs, rhs) = (values[0], values[1]);
// let (lcondition, lpath) =
// recursive_prepare_json_node_condition(expr, lhs,
// path)?; condition = condition.add(lcondition);
// let (rcondition, rpath) =
// recursive_prepare_json_node_condition(expr, rhs,
// lpath)?; condition = condition.add(rcondition);
// path = rpath;
// conditions = conditions.add(condition);
// }
// op => conditions.add(prepare_json_leaf_condition(expr, op,
// value, &path)?), }
// } else {
// path.push(key as &'a str);
// let result = recursive_prepare_json_node_condition(expr, node,
// path)?; conditions = conditions.add(result.0);
// path = result.1;
// path.pop();
// }
// }
// Ok((conditions, path))
// }
#[cfg(test)]
mod tests {
use sea_orm::{
DeriveIden,
sea_query::{PostgresQueryBuilder, Query, Value, Values},
};
use super::*;
#[derive(DeriveIden)]
enum TestTable {
Table,
Job,
}
fn build_test_query_sql(where_expr: SimpleExpr) -> (String, Vec<Value>) {
let (sql, Values(values)) = Query::select()
.column(TestTable::Job)
.and_where(where_expr)
.from(TestTable::Table)
.build(PostgresQueryBuilder);
(sql, values)
}
#[test]
fn test_build_json_path_exists_expr() {
let (sql, params) = build_test_query_sql(build_json_path_exists_expr(
Expr::col((TestTable::Table, TestTable::Job)),
&["a", "b", "c"],
));
dbg!(&params);
assert_eq!(
sql,
"SELECT \"job\" FROM \"test_table\" WHERE JSON_EXISTS(\"test_table\".\"job\", $1)"
);
let expected_params = vec![Value::String(Some(Box::new("$.a.b.c".into())))];
assert_eq!(params, expected_params);
}
#[test]
fn test_build_json_path_query_expr() -> RecorderResult<()> {
let (sql, params) = build_test_query_sql(build_json_value_is_in_expr(
Expr::col((TestTable::Table, TestTable::Job)),
&["a", "b", "c"],
vec![
serde_json::json!(1),
serde_json::json!("str"),
serde_json::json!(true),
],
)?);
dbg!(&params);
assert_eq!(
sql,
"SELECT \"job\" FROM \"test_table\" WHERE jsonb_path_query(\"test_table\".\"job\", \
$1) = ANY(ARRAY[$3::jsonb,$4::jsonb,$5::jsonb]::jsonb[])"
);
Ok(())
}
}

View File

@ -1,4 +1,6 @@
pub mod filter;
pub mod guard;
pub mod pagination;
pub mod transformer;
pub mod util;
pub mod order;

View File

View File

@ -0,0 +1,36 @@
use async_graphql::{InputObject, SimpleObject};
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, InputObject)]
pub struct CursorInput {
pub cursor: Option<String>,
pub limit: u64,
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, InputObject)]
pub struct PageInput {
pub page: u64,
pub limit: u64,
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, InputObject)]
pub struct OffsetInput {
pub offset: u64,
pub limit: u64,
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, InputObject)]
pub struct PaginationInput {
pub cursor: Option<CursorInput>,
pub page: Option<PageInput>,
pub offset: Option<OffsetInput>,
}
pub type PageInfo = async_graphql::connection::PageInfo;
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, SimpleObject)]
pub struct PaginationInfo {
pub pages: u64,
pub current: u64,
pub offset: u64,
pub total: u64,
}

View File

@ -79,6 +79,13 @@ pub fn schema(
let context = CONTEXT.get_or_init(|| {
let mut context = BuilderContext::default();
context.pagination_input.type_name = "SeaographyPaginationInput".to_string();
context.pagination_info_object.type_name = "SeaographyPaginationInfo".to_string();
context.cursor_input.type_name = "SeaographyCursorInput".to_string();
context.offset_input.type_name = "SeaographyOffsetInput".to_string();
context.page_input.type_name = "SeaographyPageInput".to_string();
context.page_info_object.type_name = "SeaographyPageInfo".to_string();
restrict_subscriber_for_entity::<bangumi::Entity>(
&mut context,
&bangumi::Column::SubscriberId,

View File

@ -1 +1,2 @@
mod subscription;
mod task;

View File

@ -1,12 +1,10 @@
use std::sync::Arc;
use async_graphql::{Context, InputObject, Object, Result as GraphQLResult, SimpleObject};
use sea_orm::{DbErr, EntityTrait};
use crate::{
app::AppContextTrait,
auth::AuthUserInfo,
errors::RecorderError,
models::subscriptions::{self, SubscriptionTrait},
task::SubscriberTaskPayload,
};
@ -25,7 +23,7 @@ struct SyncOneSubscriptionTaskOutput {
#[Object]
impl SubscriptionMutation {
async fn sync_one_subscription_feeds(
async fn sync_one_subscription_feeds_incremental(
&self,
ctx: &Context<'_>,
input: SyncOneSubscriptionFilterInput,
@ -35,24 +33,12 @@ impl SubscriptionMutation {
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
let subscriber_id = auth_user_info.subscriber_auth.subscriber_id;
let subscription_model = subscriptions::Entity::find_by_id(input.subscription_id)
.one(app_ctx.db())
.await?
.ok_or_else(|| RecorderError::DbError {
source: DbErr::RecordNotFound(format!(
"Subscription id = {} not found",
input.subscription_id
)),
})?;
if subscription_model.subscriber_id != subscriber_id {
Err(RecorderError::DbError {
source: DbErr::RecordNotFound(format!(
"Subscription id = {} not found",
input.subscription_id
)),
})?;
}
let subscription_model = subscriptions::Model::find_by_id_and_subscriber_id(
app_ctx.as_ref(),
input.subscription_id,
subscriber_id,
)
.await?;
let subscription = subscriptions::Subscription::try_from_model(&subscription_model)?;
@ -61,7 +47,40 @@ impl SubscriptionMutation {
let task_id = task_service
.add_subscriber_task(
auth_user_info.subscriber_auth.subscriber_id,
SubscriberTaskPayload::SyncOneSubscriptionFeeds(subscription.into()),
SubscriberTaskPayload::SyncOneSubscriptionFeedsIncremental(subscription.into()),
)
.await?;
Ok(SyncOneSubscriptionTaskOutput {
task_id: task_id.to_string(),
})
}
async fn sync_one_subscription_feeds_full(
&self,
ctx: &Context<'_>,
input: SyncOneSubscriptionFilterInput,
) -> GraphQLResult<SyncOneSubscriptionTaskOutput> {
let auth_user_info = ctx.data::<AuthUserInfo>()?;
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
let subscriber_id = auth_user_info.subscriber_auth.subscriber_id;
let subscription_model = subscriptions::Model::find_by_id_and_subscriber_id(
app_ctx.as_ref(),
input.subscription_id,
subscriber_id,
)
.await?;
let subscription = subscriptions::Subscription::try_from_model(&subscription_model)?;
let task_service = app_ctx.task();
let task_id = task_service
.add_subscriber_task(
auth_user_info.subscriber_auth.subscriber_id,
SubscriberTaskPayload::SyncOneSubscriptionFeedsFull(subscription.into()),
)
.await?;
@ -80,24 +99,12 @@ impl SubscriptionMutation {
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
let subscriber_id = auth_user_info.subscriber_auth.subscriber_id;
let subscription_model = subscriptions::Entity::find_by_id(input.subscription_id)
.one(app_ctx.db())
.await?
.ok_or_else(|| RecorderError::DbError {
source: DbErr::RecordNotFound(format!(
"Subscription id = {} not found",
input.subscription_id
)),
})?;
if subscription_model.subscriber_id != subscriber_id {
Err(RecorderError::DbError {
source: DbErr::RecordNotFound(format!(
"Subscription id = {} not found",
input.subscription_id
)),
})?;
}
let subscription_model = subscriptions::Model::find_by_id_and_subscriber_id(
app_ctx.as_ref(),
input.subscription_id,
subscriber_id,
)
.await?;
let subscription = subscriptions::Subscription::try_from_model(&subscription_model)?;

View File

@ -0,0 +1,27 @@
use std::sync::Arc;
use async_graphql::{Context, InputObject, Object, Result as GraphQLResult};
use crate::{app::AppContextTrait, auth::AuthUserInfo};
struct TaskQuery;
#[derive(InputObject)]
struct SubscriberTasksFilterInput {
pub subscription_id: Option<i32>,
pub task_id: Option<String>,
pub task_type: Option<String>,
}
#[Object]
impl TaskQuery {
async fn subscriber_tasks(&self, ctx: &Context<'_>) -> GraphQLResult<Vec<String>> {
let auth_user_info = ctx.data::<AuthUserInfo>()?;
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
let subscriber_id = auth_user_info.subscriber_auth.subscriber_id;
let task_service = app_ctx.task();
todo!()
}
}

View File

@ -32,7 +32,6 @@ pub enum Subscriptions {
SubscriberId,
Category,
SourceUrl,
SourceUrls,
Enabled,
CredentialId,
}

View File

@ -64,10 +64,6 @@ impl MigrationTrait for Migration {
.col(string(Subscriptions::DisplayName))
.col(integer(Subscriptions::SubscriberId))
.col(text(Subscriptions::SourceUrl))
.col(array_null(
Subscriptions::SourceUrls,
ColumnType::String(StringLen::None),
))
.col(boolean(Subscriptions::Enabled))
.col(enumeration(
Subscriptions::Category,

View File

@ -315,4 +315,24 @@ impl Model {
)
}))
}
pub async fn get_subsribed_bangumi_list_from_subscription(
ctx: &dyn AppContextTrait,
subscription_id: i32,
) -> RecorderResult<Vec<Self>> {
let db = ctx.db();
let bangumi_list = Entity::find()
.filter(
Condition::all()
.add(subscription_bangumi::Column::SubscriptionId.eq(subscription_id)),
)
.join_rev(
JoinType::InnerJoin,
subscription_bangumi::Relation::Bangumi.def(),
)
.all(db)
.await?;
Ok(bangumi_list)
}
}

View File

@ -5,6 +5,7 @@ pub mod downloaders;
pub mod downloads;
pub mod episodes;
pub mod query;
pub mod subscriber_tasks;
pub mod subscribers;
pub mod subscription_bangumi;
pub mod subscription_episode;

View File

@ -0,0 +1,18 @@
use sea_orm::entity::prelude::*;
use crate::task::SubscriberTask;
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "subscriber_tasks")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub subscriber_id: i32,
pub job: SubscriberTask,
pub state: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View File

@ -45,7 +45,6 @@ pub struct Model {
pub subscriber_id: i32,
pub category: SubscriptionCategory,
pub source_url: String,
pub source_urls: Option<Vec<String>>,
pub enabled: bool,
pub credential_id: Option<i32>,
}
@ -176,14 +175,32 @@ impl Model {
Ok(())
}
pub async fn sync_feeds(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
let subscription = self.try_into()?;
match subscription {
Subscription::MikanSubscriber(subscription) => subscription.sync_feeds(ctx).await,
Subscription::MikanSeason(subscription) => subscription.sync_feeds(ctx).await,
Subscription::MikanBangumi(subscription) => subscription.sync_feeds(ctx).await,
Subscription::Manual => Ok(()),
pub async fn find_by_id_and_subscriber_id(
ctx: &dyn AppContextTrait,
subscriber_id: i32,
subscription_id: i32,
) -> RecorderResult<Self> {
let db = ctx.db();
let subscription_model = Entity::find_by_id(subscription_id)
.one(db)
.await?
.ok_or_else(|| RecorderError::DbError {
source: DbErr::RecordNotFound(format!(
"Subscription id {subscription_id} not found or not belong to subscriber \
{subscriber_id}",
)),
})?;
if subscription_model.subscriber_id != subscriber_id {
Err(RecorderError::DbError {
source: DbErr::RecordNotFound(format!(
"Subscription id {subscription_id} not found or not belong to subscriber \
{subscriber_id}",
)),
})?;
}
Ok(subscription_model)
}
}
@ -193,7 +210,9 @@ pub trait SubscriptionTrait: Sized + Debug {
fn get_subscription_id(&self) -> i32;
async fn sync_feeds(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()>;
async fn sync_feeds_incremental(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()>;
async fn sync_feeds_full(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()>;
async fn sync_sources(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()>;
@ -244,11 +263,20 @@ impl SubscriptionTrait for Subscription {
}
}
async fn sync_feeds(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
async fn sync_feeds_incremental(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
match self {
Self::MikanSubscriber(subscription) => subscription.sync_feeds(ctx).await,
Self::MikanSeason(subscription) => subscription.sync_feeds(ctx).await,
Self::MikanBangumi(subscription) => subscription.sync_feeds(ctx).await,
Self::MikanSubscriber(subscription) => subscription.sync_feeds_incremental(ctx).await,
Self::MikanSeason(subscription) => subscription.sync_feeds_incremental(ctx).await,
Self::MikanBangumi(subscription) => subscription.sync_feeds_incremental(ctx).await,
Self::Manual => Ok(()),
}
}
async fn sync_feeds_full(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
match self {
Self::MikanSubscriber(subscription) => subscription.sync_feeds_full(ctx).await,
Self::MikanSeason(subscription) => subscription.sync_feeds_full(ctx).await,
Self::MikanBangumi(subscription) => subscription.sync_feeds_full(ctx).await,
Self::Manual => Ok(()),
}
}

View File

@ -1,5 +1,6 @@
use std::sync::Arc;
use apalis::prelude::State;
use futures::Stream;
use serde::{Serialize, de::DeserializeOwned};

View File

@ -7,7 +7,7 @@ pub use core::{SUBSCRIBER_TASK_APALIS_NAME, SubscriberAsyncTaskTrait, Subscriber
pub use config::TaskConfig;
pub use registry::{
SubscriberTask, SubscriberTaskPayload, SyncOneSubscriptionFeedsTask,
SubscriberTask, SubscriberTaskPayload, SyncOneSubscriptionFeedsIncrementalTask,
SyncOneSubscriptionSourcesTask,
};
pub use service::TaskService;

View File

@ -1,8 +1,12 @@
mod subscription;
use std::sync::Arc;
use sea_orm::FromJsonQueryResult;
use serde::{Deserialize, Serialize};
pub use subscription::{SyncOneSubscriptionFeedsTask, SyncOneSubscriptionSourcesTask};
pub use subscription::{
SyncOneSubscriptionFeedsFullTask, SyncOneSubscriptionFeedsIncrementalTask,
SyncOneSubscriptionSourcesTask,
};
use super::SubscriberAsyncTaskTrait;
use crate::{
@ -10,11 +14,26 @@ use crate::{
errors::{RecorderError, RecorderResult},
};
#[derive(async_graphql::Enum, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Copy)]
pub enum SubscriberTaskType {
#[serde(rename = "sync_one_subscription_feeds_incremental")]
#[graphql(name = "sync_one_subscription_feeds_incremental")]
SyncOneSubscriptionFeedsIncremental,
#[serde(rename = "sync_one_subscription_feeds_full")]
#[graphql(name = "sync_one_subscription_feeds_full")]
SyncOneSubscriptionFeedsFull,
#[serde(rename = "sync_one_subscription_sources")]
#[graphql(name = "sync_one_subscription_sources")]
SyncOneSubscriptionSources,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "task_type")]
pub enum SubscriberTaskPayload {
#[serde(rename = "sync_one_subscription_feeds")]
SyncOneSubscriptionFeeds(SyncOneSubscriptionFeedsTask),
#[serde(rename = "sync_one_subscription_feeds_incremental")]
SyncOneSubscriptionFeedsIncremental(SyncOneSubscriptionFeedsIncrementalTask),
#[serde(rename = "sync_one_subscription_feeds_full")]
SyncOneSubscriptionFeedsFull(SyncOneSubscriptionFeedsFullTask),
#[serde(rename = "sync_one_subscription_sources")]
SyncOneSubscriptionSources(SyncOneSubscriptionSourcesTask),
}
@ -22,10 +41,23 @@ pub enum SubscriberTaskPayload {
impl SubscriberTaskPayload {
pub async fn run(self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
match self {
Self::SyncOneSubscriptionFeeds(task) => task.run(ctx).await,
Self::SyncOneSubscriptionFeedsIncremental(task) => task.run(ctx).await,
Self::SyncOneSubscriptionFeedsFull(task) => task.run(ctx).await,
Self::SyncOneSubscriptionSources(task) => task.run(ctx).await,
}
}
pub fn task_type(&self) -> SubscriberTaskType {
match self {
Self::SyncOneSubscriptionFeedsIncremental(_) => {
SubscriberTaskType::SyncOneSubscriptionFeedsIncremental
}
Self::SyncOneSubscriptionFeedsFull(_) => {
SubscriberTaskType::SyncOneSubscriptionFeedsFull
}
Self::SyncOneSubscriptionSources(_) => SubscriberTaskType::SyncOneSubscriptionSources,
}
}
}
impl TryFrom<&SubscriberTaskPayload> for serde_json::Value {
@ -45,7 +77,7 @@ impl TryFrom<&SubscriberTaskPayload> for serde_json::Value {
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, FromJsonQueryResult)]
pub struct SubscriberTask {
pub subscriber_id: i32,
#[serde(flatten)]

View File

@ -11,18 +11,35 @@ use crate::{
};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct SyncOneSubscriptionFeedsTask(pub subscriptions::Subscription);
pub struct SyncOneSubscriptionFeedsIncrementalTask(pub subscriptions::Subscription);
impl From<subscriptions::Subscription> for SyncOneSubscriptionFeedsTask {
impl From<subscriptions::Subscription> for SyncOneSubscriptionFeedsIncrementalTask {
fn from(subscription: subscriptions::Subscription) -> Self {
Self(subscription)
}
}
#[async_trait::async_trait]
impl SubscriberAsyncTaskTrait for SyncOneSubscriptionFeedsTask {
impl SubscriberAsyncTaskTrait for SyncOneSubscriptionFeedsIncrementalTask {
async fn run_async(self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
self.0.sync_feeds(ctx).await?;
self.0.sync_feeds_incremental(ctx).await?;
Ok(())
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct SyncOneSubscriptionFeedsFullTask(pub subscriptions::Subscription);
impl From<subscriptions::Subscription> for SyncOneSubscriptionFeedsFullTask {
fn from(subscription: subscriptions::Subscription) -> Self {
Self(subscription)
}
}
#[async_trait::async_trait]
impl SubscriberAsyncTaskTrait for SyncOneSubscriptionFeedsFullTask {
async fn run_async(self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
self.0.sync_feeds_full(ctx).await?;
Ok(())
}
}

View File

@ -13,7 +13,7 @@ use crate::{
pub struct TaskService {
pub config: TaskConfig,
ctx: Arc<dyn AppContextTrait>,
subscriber_task_storage: Arc<RwLock<PostgresStorage<SubscriberTask>>>,
pub subscriber_task_storage: Arc<RwLock<PostgresStorage<SubscriberTask>>>,
}
impl TaskService {