refactor: rewrite origin name extractor from regex to nom combinators

This commit is contained in:
2025-06-19 02:37:56 +08:00
parent c12b9b360a
commit 324427513c
10 changed files with 2241 additions and 900 deletions

View File

@@ -5,6 +5,7 @@ use std::{
};
use async_graphql::{InputObject, SimpleObject};
use async_stream::try_stream;
use fetch::fetch_bytes;
use futures::{Stream, TryStreamExt, pin_mut, try_join};
use maplit::hashmap;
@@ -292,17 +293,19 @@ impl SubscriptionTrait for MikanSeasonSubscription {
}
async fn sync_feeds_incremental(&self, ctx: Arc<dyn AppContextTrait>) -> RecorderResult<()> {
let rss_item_list = self
.get_rss_item_list_from_subsribed_url_rss_link(ctx.as_ref())
.await?;
let rss_item_stream = self.get_rss_item_stream_from_subsribed_url_rss_link(ctx.as_ref());
sync_mikan_feeds_from_rss_item_list(
ctx.as_ref(),
rss_item_list,
self.get_subscriber_id(),
self.get_subscription_id(),
)
.await?;
pin_mut!(rss_item_stream);
while let Some(rss_item_chunk_list) = rss_item_stream.try_next().await? {
sync_mikan_feeds_from_rss_item_list(
ctx.as_ref(),
rss_item_chunk_list,
self.get_subscriber_id(),
self.get_subscription_id(),
)
.await?;
}
Ok(())
}
@@ -393,48 +396,53 @@ impl MikanSeasonSubscription {
)
}
#[tracing::instrument(err, skip(ctx))]
async fn get_rss_item_list_from_subsribed_url_rss_link(
fn get_rss_item_stream_from_subsribed_url_rss_link(
&self,
ctx: &dyn AppContextTrait,
) -> RecorderResult<Vec<MikanRssEpisodeItem>> {
let db = ctx.db();
) -> impl Stream<Item = RecorderResult<Vec<MikanRssEpisodeItem>>> {
try_stream! {
let subscribed_bangumi_list = bangumi::Entity::find()
.filter(
Condition::all()
.add(subscription_bangumi::Column::SubscriptionId.eq(self.subscription_id)),
)
.join_rev(
JoinType::InnerJoin,
subscription_bangumi::Relation::Bangumi.def(),
)
.all(db)
.await?;
let db = ctx.db();
let mut rss_item_list = vec![];
for subscribed_bangumi in subscribed_bangumi_list {
let rss_url = subscribed_bangumi
.rss_link
.with_whatever_context::<_, String, RecorderError>(|| {
format!(
"rss_link is required, subscription_id = {}, bangumi_name = {}",
self.subscription_id, subscribed_bangumi.display_name
)
})?;
let bytes = fetch_bytes(ctx.mikan(), rss_url).await?;
let subscribed_bangumi_list = bangumi::Entity::find()
.filter(
Condition::all()
.add(subscription_bangumi::Column::SubscriptionId.eq(self.subscription_id)),
)
.join_rev(
JoinType::InnerJoin,
subscription_bangumi::Relation::Bangumi.def(),
)
.all(db)
.await?;
let channel = rss::Channel::read_from(&bytes[..])?;
for (idx, item) in channel.items.into_iter().enumerate() {
let item = MikanRssEpisodeItem::try_from(item)
.with_whatever_context::<_, String, RecorderError>(|_| {
format!("failed to extract rss item at idx {idx}")
for subscribed_bangumi in subscribed_bangumi_list {
let rss_url = subscribed_bangumi
.rss_link
.with_whatever_context::<_, String, RecorderError>(|| {
format!(
"rss_link is required, subscription_id = {}, bangumi_name = {}",
self.subscription_id, subscribed_bangumi.display_name
)
})?;
rss_item_list.push(item);
let bytes = fetch_bytes(ctx.mikan(), rss_url).await?;
let channel = rss::Channel::read_from(&bytes[..])?;
let mut rss_item_list = vec![];
for (idx, item) in channel.items.into_iter().enumerate() {
let item = MikanRssEpisodeItem::try_from(item)
.with_whatever_context::<_, String, RecorderError>(|_| {
format!("failed to extract rss item at idx {idx}")
})?;
rss_item_list.push(item);
}
yield rss_item_list;
}
}
Ok(rss_item_list)
}
}