feat: try views and seaography

This commit is contained in:
master 2025-06-15 05:02:23 +08:00
parent a2254bbe80
commit 7eb4e41708
12 changed files with 257 additions and 15 deletions

1
Cargo.lock generated
View File

@ -5217,6 +5217,7 @@ dependencies = [
"clap",
"cocoon",
"color-eyre",
"convert_case 0.8.0",
"ctor",
"dotenvy",
"downloader",

View File

@ -55,6 +55,9 @@ moka = { workspace = true }
chrono = { workspace = true }
tracing-subscriber = { workspace = true }
mockito = { workspace = true }
color-eyre = { workspace = true, optional = true }
inquire = { workspace = true, optional = true }
convert_case = { workspace = true }
sea-orm = { version = "1.1", features = [
"sqlx-sqlite",
@ -124,8 +127,6 @@ rust_decimal = "1.37.1"
reqwest_cookie_store = "0.8.0"
nanoid = "0.4.0"
jwtk = "0.4.0"
color-eyre = { workspace = true, optional = true }
inquire = { workspace = true, optional = true }
percent-encoding = "2.3.1"

View File

@ -1,19 +1,95 @@
use seaography::{Builder as SeaographyBuilder, BuilderContext};
use std::{ops::Deref, pin::Pin, sync::Arc};
use async_graphql::dynamic::{ResolverContext, ValueAccessor};
use sea_orm::{
ConnectionTrait, EntityTrait, QueryFilter, QuerySelect, QueryTrait, prelude::Expr,
sea_query::Query,
};
use seaography::{Builder as SeaographyBuilder, BuilderContext, get_filter_conditions};
use crate::{
graphql::infra::json::restrict_jsonb_filter_input_for_entity, models::subscriber_tasks,
app::AppContextTrait,
errors::{RecorderError, RecorderResult},
graphql::{
domains::subscribers::restrict_subscriber_for_entity,
infra::{
custom::generate_custom_entity_delete_mutation_field,
json::{convert_jsonb_output_case_for_entity, restrict_jsonb_filter_input_for_entity},
},
},
models::subscriber_tasks,
task::ApalisJob,
};
pub fn register_subscriber_tasks_entity_mutations(builder: &mut SeaographyBuilder) {
let context = builder.context;
let delete_mutation = generate_custom_entity_delete_mutation_field::<subscriber_tasks::Entity>(
context,
Arc::new(
|resolver_ctx: &ResolverContext<'_>,
app_ctx: Arc<dyn AppContextTrait>,
filters: Option<ValueAccessor<'_>>|
-> Pin<Box<dyn Future<Output = RecorderResult<Option<i32>>> + Send>> {
let filters_condition = get_filter_conditions::<subscriber_tasks::Entity>(
resolver_ctx,
context,
filters,
);
Box::pin(async move {
let db = app_ctx.db();
let select_subquery = subscriber_tasks::Entity::find()
.select_only()
.column(subscriber_tasks::Column::Id)
.filter(filters_condition);
let delete_query = Query::delete()
.from_table(ApalisJob::Table)
.and_where(
Expr::col(ApalisJob::Id).in_subquery(select_subquery.into_query()),
)
.to_owned();
let db_backend = db.deref().get_database_backend();
let delete_statement = db_backend.build(&delete_query);
let result = db.execute(delete_statement).await?;
Ok::<Option<i32>, RecorderError>(Some(result.rows_affected() as i32))
})
as Pin<Box<dyn Future<Output = RecorderResult<Option<i32>>> + Send>>
},
),
);
builder.mutations.push(delete_mutation);
}
pub fn register_subscriber_tasks_to_schema_context(context: &mut BuilderContext) {
restrict_subscriber_for_entity::<subscriber_tasks::Entity>(
context,
&subscriber_tasks::Column::SubscriberId,
);
restrict_jsonb_filter_input_for_entity::<subscriber_tasks::Entity>(
context,
&subscriber_tasks::Column::Job,
);
convert_jsonb_output_case_for_entity::<subscriber_tasks::Entity>(
context,
&subscriber_tasks::Column::Job,
);
}
pub fn register_subscriber_tasks_to_schema_builder(
mut builder: SeaographyBuilder,
) -> SeaographyBuilder {
builder.register_entity::<subscriber_tasks::Entity>(
<subscriber_tasks::RelatedEntity as sea_orm::Iterable>::iter()
.map(|rel| seaography::RelationBuilder::get_relation(&rel, builder.context))
.collect(),
);
builder = builder.register_entity_dataloader_one_to_one(subscriber_tasks::Entity, tokio::spawn);
builder =
builder.register_entity_dataloader_one_to_many(subscriber_tasks::Entity, tokio::spawn);
builder.register_enumeration::<subscriber_tasks::SubscriberTaskType>();
builder.register_enumeration::<subscriber_tasks::SubscriberTaskStatus>();
builder
}

View File

@ -0,0 +1,79 @@
use std::{pin::Pin, sync::Arc};
use async_graphql::dynamic::{
Field, FieldFuture, InputValue, ResolverContext, TypeRef, ValueAccessor,
};
use sea_orm::EntityTrait;
use seaography::{
BuilderContext, EntityDeleteMutationBuilder, EntityObjectBuilder, FilterInputBuilder,
GuardAction,
};
use crate::{app::AppContextTrait, errors::RecorderResult};
pub type DeleteMutationFn = Arc<
dyn Fn(
&ResolverContext<'_>,
Arc<dyn AppContextTrait>,
Option<ValueAccessor<'_>>,
) -> Pin<Box<dyn Future<Output = RecorderResult<Option<i32>>> + Send>>
+ Send
+ Sync,
>;
pub fn generate_custom_entity_delete_mutation_field<T>(
builder_context: &'static BuilderContext,
mutation_fn: DeleteMutationFn,
) -> Field
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let entity_filter_input_builder = FilterInputBuilder {
context: builder_context,
};
let entity_object_builder = EntityObjectBuilder {
context: builder_context,
};
let entity_delete_mutation_builder = EntityDeleteMutationBuilder {
context: builder_context,
};
let object_name: String = entity_object_builder.type_name::<T>();
let context = builder_context;
let guard = builder_context.guards.entity_guards.get(&object_name);
Field::new(
entity_delete_mutation_builder.type_name::<T>(),
TypeRef::named_nn(TypeRef::INT),
move |ctx| {
let mutation_fn = mutation_fn.clone();
FieldFuture::new(async move {
let guard_flag = if let Some(guard) = guard {
(*guard)(&ctx)
} else {
GuardAction::Allow
};
if let GuardAction::Block(reason) = guard_flag {
return Err::<Option<_>, async_graphql::Error>(async_graphql::Error::new(
reason.unwrap_or("Entity guard triggered.".into()),
));
}
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
let filters = ctx.args.get(&context.entity_delete_mutation.filter_field);
let result = mutation_fn(&ctx, app_ctx.clone(), filters).await?;
Ok(result.map(async_graphql::Value::from))
})
},
)
.argument(InputValue::new(
&context.entity_delete_mutation.filter_field,
TypeRef::named(entity_filter_input_builder.type_name(&object_name)),
))
}

View File

@ -3,6 +3,7 @@ use async_graphql::{
dynamic::{ResolverContext, Scalar, SchemaError},
to_value,
};
use convert_case::Case;
use itertools::Itertools;
use rust_decimal::{Decimal, prelude::FromPrimitive};
use sea_orm::{
@ -12,9 +13,13 @@ use sea_orm::{
use seaography::{
Builder as SeaographyBuilder, BuilderContext, FilterType, FnFilterCondition, SeaographyError,
};
use serde::{Serialize, de::DeserializeOwned};
use serde_json::Value as JsonValue;
use crate::{errors::RecorderResult, graphql::infra::util::get_entity_column_key};
use crate::{
errors::RecorderResult, graphql::infra::util::get_entity_column_key,
infra::json::convert_json_keys,
};
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Copy)]
pub enum JsonbFilterOperation {
@ -948,6 +953,64 @@ where
);
}
pub fn validate_jsonb_input_for_entity<T, S>(context: &mut BuilderContext, column: &T::Column)
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
S: DeserializeOwned + Serialize,
{
let entity_column_key = get_entity_column_key::<T>(context, column);
context.types.input_conversions.insert(
entity_column_key.clone(),
Box::new(move |_resolve_context, accessor| {
let deserialized = accessor.deserialize::<S>().map_err(|err| {
SeaographyError::TypeConversionError(
err.message,
format!("Json - {entity_column_key}"),
)
})?;
let json_value = serde_json::to_value(deserialized).map_err(|err| {
SeaographyError::TypeConversionError(
err.to_string(),
format!("Json - {entity_column_key}"),
)
})?;
Ok(sea_orm::Value::Json(Some(Box::new(json_value))))
}),
);
}
pub fn convert_jsonb_output_case_for_entity<T>(context: &mut BuilderContext, column: &T::Column)
where
T: EntityTrait,
<T as EntityTrait>::Model: Sync,
{
let entity_column_key = get_entity_column_key::<T>(context, column);
context.types.output_conversions.insert(
entity_column_key.clone(),
Box::new(move |value| {
if let sea_orm::Value::Json(Some(json)) = value {
let result = async_graphql::Value::from_json(convert_json_keys(
json.as_ref().clone(),
Case::Camel,
))
.map_err(|err| {
SeaographyError::TypeConversionError(
err.to_string(),
format!("Json - {entity_column_key}"),
)
})?;
Ok(result)
} else {
Err(SeaographyError::TypeConversionError(
"value should be json".to_string(),
format!("Json - {entity_column_key}"),
))
}
}),
);
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;

View File

@ -1,2 +1,3 @@
pub mod custom;
pub mod json;
pub mod util;

View File

@ -42,10 +42,6 @@ pub fn build_schema(
register_subscribers_to_schema_context(&mut context);
{
restrict_subscriber_for_entity::<bangumi::Entity>(
&mut context,
&bangumi::Column::SubscriberId,
);
restrict_subscriber_for_entity::<downloaders::Entity>(
&mut context,
&downloaders::Column::SubscriberId,
@ -74,10 +70,6 @@ pub fn build_schema(
&mut context,
&subscription_episode::Column::SubscriberId,
);
restrict_subscriber_for_entity::<subscriber_tasks::Entity>(
&mut context,
&subscriber_tasks::Column::SubscriberId,
);
restrict_subscriber_for_entity::<credential_3rd::Entity>(
&mut context,
&credential_3rd::Column::SubscriberId,
@ -110,7 +102,6 @@ pub fn build_schema(
subscription_bangumi,
subscription_episode,
subscriptions,
subscriber_tasks,
credential_3rd
]
);
@ -121,7 +112,6 @@ pub fn build_schema(
builder.register_enumeration::<downloaders::DownloaderCategory>();
builder.register_enumeration::<downloads::DownloadMime>();
builder.register_enumeration::<credential_3rd::Credential3rdType>();
builder.register_enumeration::<subscriber_tasks::SubscriberTaskStatus>();
}
builder = register_subscriptions_to_schema_builder(builder);

View File

@ -0,0 +1,20 @@
use convert_case::{Case, Casing};
use serde_json::Value;
pub fn convert_json_keys(json: Value, case: Case) -> Value {
match json {
Value::Object(object) => Value::Object(
object
.into_iter()
.map(|(key, value)| (key.to_case(case), convert_json_keys(value, case)))
.collect(),
),
Value::Array(array) => Value::Array(
array
.into_iter()
.map(|item| convert_json_keys(item, case))
.collect(),
),
_ => json,
}
}

View File

@ -0,0 +1 @@
pub mod json;

View File

@ -20,6 +20,7 @@ pub mod database;
pub mod errors;
pub mod extract;
pub mod graphql;
pub mod infra;
pub mod logger;
pub mod message;
pub mod migrations;

View File

@ -0,0 +1,7 @@
use sea_orm::sea_query;
#[derive(sea_query::Iden)]
pub enum ApalisJob {
Table,
Id,
}

View File

@ -1,11 +1,13 @@
mod config;
mod core;
mod db;
mod registry;
mod service;
pub use core::{SUBSCRIBER_TASK_APALIS_NAME, SubscriberAsyncTaskTrait, SubscriberStreamTaskTrait};
pub use config::TaskConfig;
pub use db::ApalisJob;
pub use registry::{
SubscriberTask, SubscriberTaskType, SubscriberTaskTypeEnum, SubscriberTaskTypeVariant,
SubscriberTaskTypeVariantIter, SyncOneSubscriptionFeedsFullTask,