fix: fix graphql
This commit is contained in:
parent
b2f327d48f
commit
f1d8318500
@ -2,5 +2,5 @@
|
|||||||
recorder-playground = "run -p recorder --example playground -- --environment development"
|
recorder-playground = "run -p recorder --example playground -- --environment development"
|
||||||
|
|
||||||
[build]
|
[build]
|
||||||
rustflags = ["-Zthreads=8", "-Zshare-generics=y"]
|
#rustflags = ["-Zthreads=8", "-Zshare-generics=y"]
|
||||||
# rustflags = ["-Zthreads=8"]
|
rustflags = ["-Zthreads=8"]
|
||||||
|
24
Cargo.lock
generated
24
Cargo.lock
generated
@ -1188,6 +1188,15 @@ dependencies = [
|
|||||||
"unicode-segmentation",
|
"unicode-segmentation",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "convert_case"
|
||||||
|
version = "0.8.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "baaaa0ecca5b51987b9423ccdc971514dd8b0bb7b4060b983d3664dad3f1f89f"
|
||||||
|
dependencies = [
|
||||||
|
"unicode-segmentation",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cookie"
|
name = "cookie"
|
||||||
version = "0.18.1"
|
version = "0.18.1"
|
||||||
@ -3619,7 +3628,7 @@ version = "1.0.0-alpha.43"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "84c12744d1279367caed41739ef094c325d53fb0ffcd4f9b84a368796f870252"
|
checksum = "84c12744d1279367caed41739ef094c325d53fb0ffcd4f9b84a368796f870252"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"convert_case",
|
"convert_case 0.6.0",
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
"syn 1.0.109",
|
"syn 1.0.109",
|
||||||
@ -5175,6 +5184,7 @@ dependencies = [
|
|||||||
"typed-builder 0.21.0",
|
"typed-builder 0.21.0",
|
||||||
"url",
|
"url",
|
||||||
"util",
|
"util",
|
||||||
|
"util-derive",
|
||||||
"uuid",
|
"uuid",
|
||||||
"zune-image",
|
"zune-image",
|
||||||
]
|
]
|
||||||
@ -7585,6 +7595,18 @@ dependencies = [
|
|||||||
"snafu",
|
"snafu",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "util-derive"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"convert_case 0.8.0",
|
||||||
|
"darling",
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"snafu",
|
||||||
|
"syn 2.0.101",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "uuid"
|
name = "uuid"
|
||||||
version = "1.16.0"
|
version = "1.16.0"
|
||||||
|
14
Cargo.toml
14
Cargo.toml
@ -2,6 +2,7 @@
|
|||||||
members = [
|
members = [
|
||||||
"packages/testing-torrents",
|
"packages/testing-torrents",
|
||||||
"packages/util",
|
"packages/util",
|
||||||
|
"packages/util-derive",
|
||||||
"packages/fetch",
|
"packages/fetch",
|
||||||
"packages/downloader",
|
"packages/downloader",
|
||||||
"apps/recorder",
|
"apps/recorder",
|
||||||
@ -9,6 +10,12 @@ members = [
|
|||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
|
testing-torrents = { path = "./packages/testing-torrents" }
|
||||||
|
util = { path = "./packages/util" }
|
||||||
|
util-derive = { path = "./packages/util-derive" }
|
||||||
|
fetch = { path = "./packages/fetch" }
|
||||||
|
downloader = { path = "./packages/downloader" }
|
||||||
|
|
||||||
moka = "0.12"
|
moka = "0.12"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
quirks_path = "0.1"
|
quirks_path = "0.1"
|
||||||
@ -40,11 +47,8 @@ reqwest = { version = "0.12", default-features = false, features = [
|
|||||||
] }
|
] }
|
||||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
|
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
|
||||||
axum-extra = "0.10"
|
axum-extra = "0.10"
|
||||||
|
mockito = { version = "1.6.1" }
|
||||||
testing-torrents = { path = "./packages/testing-torrents" }
|
convert_case = "0.8"
|
||||||
util = { path = "./packages/util" }
|
|
||||||
fetch = { path = "./packages/fetch" }
|
|
||||||
downloader = { path = "./packages/downloader" }
|
|
||||||
|
|
||||||
[patch.crates-io]
|
[patch.crates-io]
|
||||||
jwt-authorizer = { git = "https://github.com/blablacio/jwt-authorizer.git", rev = "e956774" }
|
jwt-authorizer = { git = "https://github.com/blablacio/jwt-authorizer.git", rev = "e956774" }
|
||||||
|
@ -25,6 +25,11 @@ testcontainers = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
downloader = { workspace = true }
|
||||||
|
util = { workspace = true }
|
||||||
|
util-derive = { workspace = true }
|
||||||
|
fetch = { workspace = true }
|
||||||
|
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
tokio = { workspace = true }
|
tokio = { workspace = true }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
@ -49,7 +54,7 @@ serde_with = { workspace = true }
|
|||||||
moka = { workspace = true }
|
moka = { workspace = true }
|
||||||
chrono = { workspace = true }
|
chrono = { workspace = true }
|
||||||
tracing-subscriber = { workspace = true }
|
tracing-subscriber = { workspace = true }
|
||||||
|
mockito = { workspace = true, optional = true }
|
||||||
|
|
||||||
sea-orm = { version = "1.1", features = [
|
sea-orm = { version = "1.1", features = [
|
||||||
"sqlx-sqlite",
|
"sqlx-sqlite",
|
||||||
@ -113,15 +118,12 @@ cocoon = { version = "0.4.3", features = ["getrandom", "thiserror"] }
|
|||||||
rand = "0.9.1"
|
rand = "0.9.1"
|
||||||
rust_decimal = "1.37.1"
|
rust_decimal = "1.37.1"
|
||||||
reqwest_cookie_store = "0.8.0"
|
reqwest_cookie_store = "0.8.0"
|
||||||
mockito = { version = "1.6.1", optional = true }
|
|
||||||
|
|
||||||
downloader = { workspace = true }
|
|
||||||
util = { workspace = true }
|
|
||||||
fetch = { workspace = true }
|
|
||||||
nanoid = "0.4.0"
|
nanoid = "0.4.0"
|
||||||
|
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
serial_test = "3"
|
serial_test = "3"
|
||||||
insta = { version = "1", features = ["redactions", "yaml", "filters"] }
|
insta = { version = "1", features = ["redactions", "yaml", "filters"] }
|
||||||
rstest = "0.25"
|
rstest = "0.25"
|
||||||
ctor = "0.4.0"
|
ctor = "0.4.0"
|
||||||
|
mockito = { workspace = true }
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -1,10 +1,11 @@
|
|||||||
mod json;
|
mod json;
|
||||||
mod subscriber;
|
mod subscriber;
|
||||||
|
|
||||||
use std::borrow::Cow;
|
|
||||||
|
|
||||||
use async_graphql::dynamic::TypeRef;
|
use async_graphql::dynamic::TypeRef;
|
||||||
pub use json::{JSONB_FILTER_INFO, jsonb_filter_condition_function};
|
pub use json::{
|
||||||
|
JSONB_FILTER_NAME, jsonb_filter_condition_function,
|
||||||
|
register_jsonb_input_filter_to_dynamic_schema,
|
||||||
|
};
|
||||||
use maplit::btreeset;
|
use maplit::btreeset;
|
||||||
use seaography::{FilterInfo, FilterOperation as SeaographqlFilterOperation};
|
use seaography::{FilterInfo, FilterOperation as SeaographqlFilterOperation};
|
||||||
pub use subscriber::{SUBSCRIBER_ID_FILTER_INFO, subscriber_id_condition_function};
|
pub use subscriber::{SUBSCRIBER_ID_FILTER_INFO, subscriber_id_condition_function};
|
||||||
@ -15,9 +16,4 @@ pub fn init_custom_filter_info() {
|
|||||||
base_type: TypeRef::INT.into(),
|
base_type: TypeRef::INT.into(),
|
||||||
supported_operations: btreeset! { SeaographqlFilterOperation::Equals },
|
supported_operations: btreeset! { SeaographqlFilterOperation::Equals },
|
||||||
});
|
});
|
||||||
JSONB_FILTER_INFO.get_or_init(|| FilterInfo {
|
|
||||||
type_name: String::from("JsonbFilterInput"),
|
|
||||||
base_type: TypeRef::Named(Cow::Borrowed("serde_json::Value")).to_string(),
|
|
||||||
supported_operations: btreeset! { SeaographqlFilterOperation::Equals },
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
@ -3,14 +3,17 @@ use once_cell::sync::OnceCell;
|
|||||||
use sea_orm::{DatabaseConnection, EntityTrait, Iterable};
|
use sea_orm::{DatabaseConnection, EntityTrait, Iterable};
|
||||||
use seaography::{Builder, BuilderContext, FilterType, FilterTypesMapHelper};
|
use seaography::{Builder, BuilderContext, FilterType, FilterTypesMapHelper};
|
||||||
|
|
||||||
use crate::graphql::infra::{
|
use crate::graphql::{
|
||||||
|
infra::{
|
||||||
filter::{
|
filter::{
|
||||||
JSONB_FILTER_INFO, SUBSCRIBER_ID_FILTER_INFO, init_custom_filter_info,
|
JSONB_FILTER_NAME, SUBSCRIBER_ID_FILTER_INFO, init_custom_filter_info,
|
||||||
subscriber_id_condition_function,
|
register_jsonb_input_filter_to_dynamic_schema, subscriber_id_condition_function,
|
||||||
},
|
},
|
||||||
guard::{guard_entity_with_subscriber_id, guard_field_with_subscriber_id},
|
guard::{guard_entity_with_subscriber_id, guard_field_with_subscriber_id},
|
||||||
transformer::{filter_condition_transformer, mutation_input_object_transformer},
|
transformer::{filter_condition_transformer, mutation_input_object_transformer},
|
||||||
util::{get_entity_column_key, get_entity_key},
|
util::{get_entity_column_key, get_entity_key},
|
||||||
|
},
|
||||||
|
views::register_subscriptions_to_schema,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub static CONTEXT: OnceCell<BuilderContext> = OnceCell::new();
|
pub static CONTEXT: OnceCell<BuilderContext> = OnceCell::new();
|
||||||
@ -35,9 +38,7 @@ where
|
|||||||
let entity_column_key = get_entity_column_key::<T>(context, column);
|
let entity_column_key = get_entity_column_key::<T>(context, column);
|
||||||
context.filter_types.overwrites.insert(
|
context.filter_types.overwrites.insert(
|
||||||
entity_column_key.clone(),
|
entity_column_key.clone(),
|
||||||
Some(FilterType::Custom(
|
Some(FilterType::Custom(JSONB_FILTER_NAME.to_string())),
|
||||||
JSONB_FILTER_INFO.get().unwrap().type_name.clone(),
|
|
||||||
)),
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -94,12 +95,12 @@ pub fn schema(
|
|||||||
let context = CONTEXT.get_or_init(|| {
|
let context = CONTEXT.get_or_init(|| {
|
||||||
let mut context = BuilderContext::default();
|
let mut context = BuilderContext::default();
|
||||||
|
|
||||||
context.pagination_input.type_name = "SeaographyPaginationInput".to_string();
|
context.pagination_input.type_name = "PaginationInput".to_string();
|
||||||
context.pagination_info_object.type_name = "SeaographyPaginationInfo".to_string();
|
context.pagination_info_object.type_name = "PaginationInfo".to_string();
|
||||||
context.cursor_input.type_name = "SeaographyCursorInput".to_string();
|
context.cursor_input.type_name = "CursorInput".to_string();
|
||||||
context.offset_input.type_name = "SeaographyOffsetInput".to_string();
|
context.offset_input.type_name = "OffsetInput".to_string();
|
||||||
context.page_input.type_name = "SeaographyPageInput".to_string();
|
context.page_input.type_name = "PageInput".to_string();
|
||||||
context.page_info_object.type_name = "SeaographyPageInfo".to_string();
|
context.page_info_object.type_name = "PageInfo".to_string();
|
||||||
|
|
||||||
restrict_subscriber_for_entity::<bangumi::Entity>(
|
restrict_subscriber_for_entity::<bangumi::Entity>(
|
||||||
&mut context,
|
&mut context,
|
||||||
@ -160,6 +161,7 @@ pub fn schema(
|
|||||||
builder.schema = builder.schema.register(
|
builder.schema = builder.schema.register(
|
||||||
filter_types_map_helper.generate_filter_input(SUBSCRIBER_ID_FILTER_INFO.get().unwrap()),
|
filter_types_map_helper.generate_filter_input(SUBSCRIBER_ID_FILTER_INFO.get().unwrap()),
|
||||||
);
|
);
|
||||||
|
builder.schema = register_jsonb_input_filter_to_dynamic_schema(builder.schema);
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
@ -193,6 +195,10 @@ pub fn schema(
|
|||||||
builder.register_enumeration::<downloads::DownloadMime>();
|
builder.register_enumeration::<downloads::DownloadMime>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
builder = register_subscriptions_to_schema(builder);
|
||||||
|
}
|
||||||
|
|
||||||
let schema = builder.schema_builder();
|
let schema = builder.schema_builder();
|
||||||
|
|
||||||
let schema = if let Some(depth) = depth {
|
let schema = if let Some(depth) = depth {
|
||||||
|
@ -1,2 +1,3 @@
|
|||||||
mod subscription;
|
mod subscription;
|
||||||
mod task;
|
|
||||||
|
pub use subscription::register_subscriptions_to_schema;
|
||||||
|
@ -1,6 +1,11 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use async_graphql::{Context, InputObject, Object, Result as GraphQLResult, SimpleObject};
|
use async_graphql::dynamic::{
|
||||||
|
Field, FieldFuture, FieldValue, InputObject, InputValue, Object, TypeRef,
|
||||||
|
};
|
||||||
|
use seaography::Builder as SeaographyBuilder;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use util_derive::DynamicGraphql;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
app::AppContextTrait,
|
app::AppContextTrait,
|
||||||
@ -9,104 +14,192 @@ use crate::{
|
|||||||
task::SubscriberTaskPayload,
|
task::SubscriberTaskPayload,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct SubscriptionMutation;
|
#[derive(DynamicGraphql, Serialize, Deserialize, Clone, Debug)]
|
||||||
|
|
||||||
#[derive(InputObject)]
|
|
||||||
struct SyncOneSubscriptionFilterInput {
|
struct SyncOneSubscriptionFilterInput {
|
||||||
pub subscription_id: i32,
|
pub subscription_id: i32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(SimpleObject)]
|
impl SyncOneSubscriptionFilterInput {
|
||||||
struct SyncOneSubscriptionTaskOutput {
|
fn input_type_name() -> &'static str {
|
||||||
|
"SyncOneSubscriptionFilterInput"
|
||||||
|
}
|
||||||
|
|
||||||
|
fn arg_name() -> &'static str {
|
||||||
|
"filter"
|
||||||
|
}
|
||||||
|
|
||||||
|
fn generate_input_object() -> InputObject {
|
||||||
|
InputObject::new(Self::input_type_name())
|
||||||
|
.description("The input of the subscriptionSyncOne series of mutations")
|
||||||
|
.field(InputValue::new(
|
||||||
|
SyncOneSubscriptionFilterInputFieldEnum::SubscriptionId.as_str(),
|
||||||
|
TypeRef::named_nn(TypeRef::INT),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(DynamicGraphql, Serialize, Deserialize, Clone, Debug)]
|
||||||
|
pub struct SyncOneSubscriptionInfo {
|
||||||
pub task_id: String,
|
pub task_id: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[Object]
|
impl SyncOneSubscriptionInfo {
|
||||||
impl SubscriptionMutation {
|
fn object_type_name() -> &'static str {
|
||||||
async fn sync_one_subscription_feeds_incremental(
|
"SyncOneSubscriptionInfo"
|
||||||
&self,
|
}
|
||||||
ctx: &Context<'_>,
|
|
||||||
input: SyncOneSubscriptionFilterInput,
|
fn generate_output_object() -> Object {
|
||||||
) -> GraphQLResult<SyncOneSubscriptionTaskOutput> {
|
Object::new(Self::object_type_name())
|
||||||
|
.description("The output of the subscriptionSyncOne series of mutations")
|
||||||
|
.field(Field::new(
|
||||||
|
"taskId",
|
||||||
|
TypeRef::named_nn(TypeRef::STRING),
|
||||||
|
move |ctx| {
|
||||||
|
FieldFuture::new(async move {
|
||||||
|
let subscription_info = ctx.parent_value.try_downcast_ref::<Self>()?;
|
||||||
|
Ok(Some(async_graphql::Value::from(
|
||||||
|
subscription_info.task_id.as_str(),
|
||||||
|
)))
|
||||||
|
})
|
||||||
|
},
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register_subscriptions_to_schema(mut builder: SeaographyBuilder) -> SeaographyBuilder {
|
||||||
|
builder.schema = builder
|
||||||
|
.schema
|
||||||
|
.register(SyncOneSubscriptionFilterInput::generate_input_object());
|
||||||
|
builder.schema = builder
|
||||||
|
.schema
|
||||||
|
.register(SyncOneSubscriptionInfo::generate_output_object());
|
||||||
|
|
||||||
|
builder.queries.push(
|
||||||
|
Field::new(
|
||||||
|
"subscriptionSyncOneFeedsIncremental",
|
||||||
|
TypeRef::named_nn(SyncOneSubscriptionInfo::object_type_name()),
|
||||||
|
move |ctx| {
|
||||||
|
FieldFuture::new(async move {
|
||||||
let auth_user_info = ctx.data::<AuthUserInfo>()?;
|
let auth_user_info = ctx.data::<AuthUserInfo>()?;
|
||||||
|
|
||||||
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
|
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
|
||||||
let subscriber_id = auth_user_info.subscriber_auth.subscriber_id;
|
let subscriber_id = auth_user_info.subscriber_auth.subscriber_id;
|
||||||
|
|
||||||
|
let filter_input: SyncOneSubscriptionFilterInput = ctx
|
||||||
|
.args
|
||||||
|
.get(SyncOneSubscriptionFilterInput::arg_name())
|
||||||
|
.unwrap()
|
||||||
|
.deserialize()?;
|
||||||
|
|
||||||
let subscription_model = subscriptions::Model::find_by_id_and_subscriber_id(
|
let subscription_model = subscriptions::Model::find_by_id_and_subscriber_id(
|
||||||
app_ctx.as_ref(),
|
app_ctx.as_ref(),
|
||||||
input.subscription_id,
|
filter_input.subscription_id,
|
||||||
subscriber_id,
|
subscriber_id,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let subscription = subscriptions::Subscription::try_from_model(&subscription_model)?;
|
let subscription =
|
||||||
|
subscriptions::Subscription::try_from_model(&subscription_model)?;
|
||||||
|
|
||||||
let task_service = app_ctx.task();
|
let task_service = app_ctx.task();
|
||||||
|
|
||||||
let task_id = task_service
|
let task_id = task_service
|
||||||
.add_subscriber_task(
|
.add_subscriber_task(
|
||||||
auth_user_info.subscriber_auth.subscriber_id,
|
auth_user_info.subscriber_auth.subscriber_id,
|
||||||
SubscriberTaskPayload::SyncOneSubscriptionFeedsIncremental(subscription.into()),
|
SubscriberTaskPayload::SyncOneSubscriptionFeedsIncremental(
|
||||||
|
subscription.into(),
|
||||||
|
),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(SyncOneSubscriptionTaskOutput {
|
Ok(Some(FieldValue::owned_any(SyncOneSubscriptionInfo {
|
||||||
task_id: task_id.to_string(),
|
task_id: task_id.to_string(),
|
||||||
|
})))
|
||||||
})
|
})
|
||||||
}
|
},
|
||||||
|
)
|
||||||
|
.argument(InputValue::new(
|
||||||
|
SyncOneSubscriptionFilterInput::arg_name(),
|
||||||
|
TypeRef::named_nn(SyncOneSubscriptionFilterInput::input_type_name()),
|
||||||
|
)),
|
||||||
|
);
|
||||||
|
|
||||||
async fn sync_one_subscription_feeds_full(
|
builder.queries.push(
|
||||||
&self,
|
Field::new(
|
||||||
ctx: &Context<'_>,
|
"subscriptionSyncOneFeedsFull",
|
||||||
input: SyncOneSubscriptionFilterInput,
|
TypeRef::named_nn(SyncOneSubscriptionInfo::object_type_name()),
|
||||||
) -> GraphQLResult<SyncOneSubscriptionTaskOutput> {
|
move |ctx| {
|
||||||
|
FieldFuture::new(async move {
|
||||||
let auth_user_info = ctx.data::<AuthUserInfo>()?;
|
let auth_user_info = ctx.data::<AuthUserInfo>()?;
|
||||||
|
|
||||||
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
|
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
|
||||||
let subscriber_id = auth_user_info.subscriber_auth.subscriber_id;
|
let subscriber_id = auth_user_info.subscriber_auth.subscriber_id;
|
||||||
|
|
||||||
|
let filter_input: SyncOneSubscriptionFilterInput = ctx
|
||||||
|
.args
|
||||||
|
.get(SyncOneSubscriptionFilterInput::arg_name())
|
||||||
|
.unwrap()
|
||||||
|
.deserialize()?;
|
||||||
|
|
||||||
let subscription_model = subscriptions::Model::find_by_id_and_subscriber_id(
|
let subscription_model = subscriptions::Model::find_by_id_and_subscriber_id(
|
||||||
app_ctx.as_ref(),
|
app_ctx.as_ref(),
|
||||||
input.subscription_id,
|
filter_input.subscription_id,
|
||||||
subscriber_id,
|
subscriber_id,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let subscription = subscriptions::Subscription::try_from_model(&subscription_model)?;
|
let subscription =
|
||||||
|
subscriptions::Subscription::try_from_model(&subscription_model)?;
|
||||||
|
|
||||||
let task_service = app_ctx.task();
|
let task_service = app_ctx.task();
|
||||||
|
|
||||||
let task_id = task_service
|
let task_id = task_service
|
||||||
.add_subscriber_task(
|
.add_subscriber_task(
|
||||||
auth_user_info.subscriber_auth.subscriber_id,
|
auth_user_info.subscriber_auth.subscriber_id,
|
||||||
SubscriberTaskPayload::SyncOneSubscriptionFeedsFull(subscription.into()),
|
SubscriberTaskPayload::SyncOneSubscriptionFeedsFull(
|
||||||
|
subscription.into(),
|
||||||
|
),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(SyncOneSubscriptionTaskOutput {
|
Ok(Some(FieldValue::owned_any(SyncOneSubscriptionInfo {
|
||||||
task_id: task_id.to_string(),
|
task_id: task_id.to_string(),
|
||||||
|
})))
|
||||||
})
|
})
|
||||||
}
|
},
|
||||||
|
)
|
||||||
|
.argument(InputValue::new(
|
||||||
|
SyncOneSubscriptionFilterInput::arg_name(),
|
||||||
|
TypeRef::named_nn(SyncOneSubscriptionFilterInput::input_type_name()),
|
||||||
|
)),
|
||||||
|
);
|
||||||
|
|
||||||
async fn sync_one_subscription_sources(
|
builder.mutations.push(
|
||||||
&self,
|
Field::new(
|
||||||
ctx: &Context<'_>,
|
"subscriptionSyncOneSources",
|
||||||
input: SyncOneSubscriptionFilterInput,
|
TypeRef::named_nn(SyncOneSubscriptionInfo::object_type_name()),
|
||||||
) -> GraphQLResult<SyncOneSubscriptionTaskOutput> {
|
move |ctx| {
|
||||||
|
FieldFuture::new(async move {
|
||||||
let auth_user_info = ctx.data::<AuthUserInfo>()?;
|
let auth_user_info = ctx.data::<AuthUserInfo>()?;
|
||||||
|
|
||||||
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
|
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
|
||||||
|
|
||||||
let subscriber_id = auth_user_info.subscriber_auth.subscriber_id;
|
let subscriber_id = auth_user_info.subscriber_auth.subscriber_id;
|
||||||
|
|
||||||
|
let filter_input: SyncOneSubscriptionFilterInput = ctx
|
||||||
|
.args
|
||||||
|
.get(SyncOneSubscriptionFilterInput::arg_name())
|
||||||
|
.unwrap()
|
||||||
|
.deserialize()?;
|
||||||
|
|
||||||
let subscription_model = subscriptions::Model::find_by_id_and_subscriber_id(
|
let subscription_model = subscriptions::Model::find_by_id_and_subscriber_id(
|
||||||
app_ctx.as_ref(),
|
app_ctx.as_ref(),
|
||||||
input.subscription_id,
|
filter_input.subscription_id,
|
||||||
subscriber_id,
|
subscriber_id,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let subscription = subscriptions::Subscription::try_from_model(&subscription_model)?;
|
let subscription =
|
||||||
|
subscriptions::Subscription::try_from_model(&subscription_model)?;
|
||||||
|
|
||||||
let task_service = app_ctx.task();
|
let task_service = app_ctx.task();
|
||||||
|
|
||||||
@ -117,8 +210,17 @@ impl SubscriptionMutation {
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(SyncOneSubscriptionTaskOutput {
|
Ok(Some(FieldValue::owned_any(SyncOneSubscriptionInfo {
|
||||||
task_id: task_id.to_string(),
|
task_id: task_id.to_string(),
|
||||||
|
})))
|
||||||
})
|
})
|
||||||
}
|
},
|
||||||
|
)
|
||||||
|
.argument(InputValue::new(
|
||||||
|
SyncOneSubscriptionFilterInput::arg_name(),
|
||||||
|
TypeRef::named_nn(SyncOneSubscriptionFilterInput::input_type_name()),
|
||||||
|
)),
|
||||||
|
);
|
||||||
|
|
||||||
|
builder
|
||||||
}
|
}
|
||||||
|
@ -1,27 +0,0 @@
|
|||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use async_graphql::{Context, InputObject, Object, Result as GraphQLResult};
|
|
||||||
|
|
||||||
use crate::{app::AppContextTrait, auth::AuthUserInfo};
|
|
||||||
|
|
||||||
struct TaskQuery;
|
|
||||||
|
|
||||||
#[derive(InputObject)]
|
|
||||||
struct SubscriberTasksFilterInput {
|
|
||||||
pub subscription_id: Option<i32>,
|
|
||||||
pub task_id: Option<String>,
|
|
||||||
pub task_type: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[Object]
|
|
||||||
impl TaskQuery {
|
|
||||||
async fn subscriber_tasks(&self, ctx: &Context<'_>) -> GraphQLResult<Vec<String>> {
|
|
||||||
let auth_user_info = ctx.data::<AuthUserInfo>()?;
|
|
||||||
let app_ctx = ctx.data::<Arc<dyn AppContextTrait>>()?;
|
|
||||||
let subscriber_id = auth_user_info.subscriber_auth.subscriber_id;
|
|
||||||
|
|
||||||
let task_service = app_ctx.task();
|
|
||||||
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
}
|
|
@ -12,10 +12,10 @@ impl MigrationTrait for Migration {
|
|||||||
let db = manager.get_connection();
|
let db = manager.get_connection();
|
||||||
|
|
||||||
db.execute_unprepared(&format!(
|
db.execute_unprepared(&format!(
|
||||||
r#"CREATE VIEW IF NOT EXISTS subscriber_task AS
|
r#"CREATE OR REPLACE VIEW subscriber_task AS
|
||||||
SELECT
|
SELECT
|
||||||
job,
|
job,
|
||||||
task_type,
|
job_type,
|
||||||
status,
|
status,
|
||||||
(job->'subscriber_id')::integer AS subscriber_id,
|
(job->'subscriber_id')::integer AS subscriber_id,
|
||||||
(job->'task_type')::text AS task_type,
|
(job->'task_type')::text AS task_type,
|
||||||
@ -29,7 +29,7 @@ SELECT
|
|||||||
done_at,
|
done_at,
|
||||||
priority
|
priority
|
||||||
FROM apalis.jobs
|
FROM apalis.jobs
|
||||||
WHERE job_type = {SUBSCRIBER_TASK_APALIS_NAME}
|
WHERE job_type = '{SUBSCRIBER_TASK_APALIS_NAME}'
|
||||||
AND jsonb_path_exists(job, '$.subscriber_id ? (@.type() == "number")')
|
AND jsonb_path_exists(job, '$.subscriber_id ? (@.type() == "number")')
|
||||||
AND jsonb_path_exists(job, '$.task_type ? (@.type() == "string")')"#,
|
AND jsonb_path_exists(job, '$.task_type ? (@.type() == "string")')"#,
|
||||||
))
|
))
|
||||||
@ -38,7 +38,7 @@ AND jsonb_path_exists(job, '$.task_type ? (@.type() == "string")')"#,
|
|||||||
db.execute_unprepared(&format!(
|
db.execute_unprepared(&format!(
|
||||||
r#"CREATE INDEX IF NOT EXISTS idx_apalis_jobs_subscriber_id
|
r#"CREATE INDEX IF NOT EXISTS idx_apalis_jobs_subscriber_id
|
||||||
ON apalis.jobs ((job -> 'subscriber_id'))
|
ON apalis.jobs ((job -> 'subscriber_id'))
|
||||||
WHERE job_type = {SUBSCRIBER_TASK_APALIS_NAME}
|
WHERE job_type = '{SUBSCRIBER_TASK_APALIS_NAME}'
|
||||||
AND jsonb_path_exists(job, '$.subscriber_id ? (@.type() == "number")')
|
AND jsonb_path_exists(job, '$.subscriber_id ? (@.type() == "number")')
|
||||||
AND jsonb_path_exists(job, '$.task_type ? (@.type() == "string")')"#
|
AND jsonb_path_exists(job, '$.task_type ? (@.type() == "string")')"#
|
||||||
))
|
))
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use apalis::prelude::State;
|
|
||||||
use futures::Stream;
|
use futures::Stream;
|
||||||
use serde::{Serialize, de::DeserializeOwned};
|
use serde::{Serialize, de::DeserializeOwned};
|
||||||
|
|
||||||
|
@ -55,7 +55,9 @@ pub async fn build_testing_database_service(
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(feature = "testcontainers"))]
|
#[cfg(not(feature = "testcontainers"))]
|
||||||
pub async fn build_testing_database_service() -> RecorderResult<DatabaseService> {
|
pub async fn build_testing_database_service(
|
||||||
|
config: TestingDatabaseServiceConfig,
|
||||||
|
) -> RecorderResult<DatabaseService> {
|
||||||
let db_service = DatabaseService::from_config(DatabaseConfig {
|
let db_service = DatabaseService::from_config(DatabaseConfig {
|
||||||
uri: String::from("postgres://konobangu:konobangu@127.0.0.1:5432/konobangu"),
|
uri: String::from("postgres://konobangu:konobangu@127.0.0.1:5432/konobangu"),
|
||||||
enable_logging: true,
|
enable_logging: true,
|
||||||
@ -64,7 +66,7 @@ pub async fn build_testing_database_service() -> RecorderResult<DatabaseService>
|
|||||||
connect_timeout: 5000,
|
connect_timeout: 5000,
|
||||||
idle_timeout: 10000,
|
idle_timeout: 10000,
|
||||||
acquire_timeout: None,
|
acquire_timeout: None,
|
||||||
auto_migrate: true,
|
auto_migrate: config.auto_migrate,
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
16
packages/util-derive/Cargo.toml
Normal file
16
packages/util-derive/Cargo.toml
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
[package]
|
||||||
|
name = "util-derive"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2024"
|
||||||
|
|
||||||
|
[lib]
|
||||||
|
proc-macro = true
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
snafu = { workspace = true }
|
||||||
|
convert_case = { workspace = true }
|
||||||
|
|
||||||
|
quote = "1"
|
||||||
|
syn = "2"
|
||||||
|
darling = "0.20"
|
||||||
|
proc-macro2 = { version = "1" }
|
156
packages/util-derive/src/lib.rs
Normal file
156
packages/util-derive/src/lib.rs
Normal file
@ -0,0 +1,156 @@
|
|||||||
|
extern crate proc_macro;
|
||||||
|
|
||||||
|
use convert_case::{Case, Casing};
|
||||||
|
use darling::{FromDeriveInput, FromField, ast::Data, util::Ignored};
|
||||||
|
use proc_macro::TokenStream;
|
||||||
|
use quote::{format_ident, quote};
|
||||||
|
use syn::{Attribute, DeriveInput, Generics, Ident, parse_macro_input};
|
||||||
|
|
||||||
|
#[derive(snafu::Snafu, Debug)]
|
||||||
|
enum GeneratorError {
|
||||||
|
#[snafu(transparent)]
|
||||||
|
Syn { source: syn::Error },
|
||||||
|
|
||||||
|
#[snafu(transparent)]
|
||||||
|
Darling { source: darling::Error },
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GeneratorError {
|
||||||
|
fn write_errors(self) -> proc_macro2::TokenStream {
|
||||||
|
match self {
|
||||||
|
GeneratorError::Syn { source } => source.to_compile_error(),
|
||||||
|
GeneratorError::Darling { source } => source.write_errors(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, FromField)]
|
||||||
|
#[darling(attributes(dyngql))]
|
||||||
|
#[allow(dead_code)]
|
||||||
|
struct DynamicGraphqlFieldInfo {
|
||||||
|
ident: Option<Ident>,
|
||||||
|
ty: syn::Type,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(FromDeriveInput)]
|
||||||
|
#[darling(attributes(dyngql), forward_attrs(doc))]
|
||||||
|
#[allow(dead_code)]
|
||||||
|
struct DynamicGraphqlInfo {
|
||||||
|
pub ident: Ident,
|
||||||
|
pub attrs: Vec<Attribute>,
|
||||||
|
pub generics: Generics,
|
||||||
|
pub data: Data<Ignored, DynamicGraphqlFieldInfo>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DynamicGraphqlInfo {
|
||||||
|
fn expand(&self) -> Result<TokenStream, GeneratorError> {
|
||||||
|
let struct_name = &self.ident;
|
||||||
|
let enum_name = format_ident!("{}FieldEnum", struct_name);
|
||||||
|
|
||||||
|
let fields = self.data.as_ref().take_struct().unwrap();
|
||||||
|
|
||||||
|
let enum_variants = fields
|
||||||
|
.iter()
|
||||||
|
.filter_map(|field| field.ident.as_ref())
|
||||||
|
.map(|field_ident| {
|
||||||
|
let variant_name = Ident::new(
|
||||||
|
&field_ident.to_string().to_case(Case::Pascal),
|
||||||
|
field_ident.span(),
|
||||||
|
);
|
||||||
|
quote! { #variant_name }
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let as_str_arms: Vec<_> = fields
|
||||||
|
.iter()
|
||||||
|
.filter_map(|field| field.ident.as_ref())
|
||||||
|
.map(|field_ident| {
|
||||||
|
let variant_name = Ident::new(
|
||||||
|
&field_ident.to_string().to_case(Case::Pascal),
|
||||||
|
field_ident.span(),
|
||||||
|
);
|
||||||
|
let field_name_str = field_ident.to_string().to_case(Case::Camel);
|
||||||
|
quote! {
|
||||||
|
Self::#variant_name => #field_name_str,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let from_str_arms: Vec<_> = fields
|
||||||
|
.iter()
|
||||||
|
.filter_map(|field| field.ident.as_ref())
|
||||||
|
.map(|field_ident| {
|
||||||
|
let variant_name = Ident::new(
|
||||||
|
&field_ident.to_string().to_case(Case::Pascal),
|
||||||
|
field_ident.span(),
|
||||||
|
);
|
||||||
|
let field_name_str = field_ident.to_string().to_case(Case::Camel);
|
||||||
|
quote! {
|
||||||
|
#field_name_str => Some(Self::#variant_name)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let all_field_names: Vec<_> = fields
|
||||||
|
.iter()
|
||||||
|
.filter_map(|field| field.ident.as_ref())
|
||||||
|
.map(|field_ident| field_ident.to_string().to_case(Case::Camel))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let result = quote! {
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||||
|
pub enum #enum_name {
|
||||||
|
#(#enum_variants),*
|
||||||
|
}
|
||||||
|
|
||||||
|
impl #enum_name {
|
||||||
|
pub fn as_str(&self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
#(#as_str_arms),*
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_str(s: &str) -> Option<Self> {
|
||||||
|
match s {
|
||||||
|
#(#from_str_arms),* ,
|
||||||
|
_ => None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn all_field_names() -> Vec<&'static str> {
|
||||||
|
vec![#(#all_field_names),*]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for #enum_name {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(f, "{}", self.as_str())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::str::FromStr for #enum_name {
|
||||||
|
type Err = String;
|
||||||
|
|
||||||
|
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||||
|
Self::from_str(s).ok_or_else(|| format!("Unknown field name: {s}"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(result.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[proc_macro_derive(DynamicGraphql, attributes(dyngql))]
|
||||||
|
pub fn derive_dynamic_graphql(input: TokenStream) -> TokenStream {
|
||||||
|
let opts =
|
||||||
|
match DynamicGraphqlInfo::from_derive_input(&parse_macro_input!(input as DeriveInput)) {
|
||||||
|
Ok(opts) => opts,
|
||||||
|
Err(err) => return TokenStream::from(err.write_errors()),
|
||||||
|
};
|
||||||
|
match opts.expand() {
|
||||||
|
Ok(token_stream) => token_stream,
|
||||||
|
Err(err) => err.write_errors().into(),
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user