feat: add new test resource mikan classic episodes tiny.parquet

This commit is contained in:
master 2025-06-23 03:07:58 +08:00
parent f055011b86
commit cde3361458
4 changed files with 145 additions and 1 deletions

View File

@ -27,3 +27,5 @@ node_modules
dist/ dist/
temp/* temp/*
!temp/.gitkeep !temp/.gitkeep
tests/resources/mikan/classic_episodes/*/*
!tests/resources/mikan/classic_episodes/parquet/tiny.parquet

View File

@ -154,7 +154,11 @@ icu = "2.0.0"
tracing-tree = "0.4.0" tracing-tree = "0.4.0"
num_cpus = "1.17.0" num_cpus = "1.17.0"
headers-accept = "0.1.4" headers-accept = "0.1.4"
polars = { version = "0.49.1", features = ["parquet"], optional = true } polars = { version = "0.49.1", features = [
"parquet",
"lazy",
"diagonal_concat",
], optional = true }
[dev-dependencies] [dev-dependencies]
inquire = { workspace = true } inquire = { workspace = true }

View File

@ -2,6 +2,7 @@ use std::collections::HashSet;
use chrono::{DateTime, Duration, FixedOffset, NaiveDate, NaiveTime, TimeZone, Utc}; use chrono::{DateTime, Duration, FixedOffset, NaiveDate, NaiveTime, TimeZone, Utc};
use fetch::{HttpClientConfig, fetch_html}; use fetch::{HttpClientConfig, fetch_html};
use itertools::Itertools;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use nom::{ use nom::{
IResult, Parser, IResult, Parser,
@ -398,6 +399,136 @@ async fn scrape_mikan_classic_episode_table_page_from_rev_id(
scrape_mikan_classic_episode_table_page(mikan_client, page, Some((rev_idx, total))).await scrape_mikan_classic_episode_table_page(mikan_client, page, Some((rev_idx, total))).await
} }
async fn merge_mikan_classic_episodes_and_strip_columns() -> RecorderResult<()> {
use polars::prelude::*;
let dir = TEST_FOLDER.join("parquet");
let files = std::fs::read_dir(dir)?;
let parquet_paths = files
.filter_map(|f| f.ok())
.filter_map(|f| {
let path = f.path();
if let Some(ext) = path.extension()
&& ext == "parquet"
&& path
.file_stem()
.is_some_and(|f| f.to_string_lossy().starts_with("rev_"))
{
Some(path)
} else {
None
}
})
.collect::<Vec<_>>();
if parquet_paths.is_empty() {
return Err(RecorderError::without_source(
"No parquet files found to merge".into(),
));
}
println!("Found {} parquet files to merge", parquet_paths.len());
// 读取并合并所有 parquet 文件
let mut all_dfs = Vec::new();
for path in &parquet_paths {
println!("Reading {path:?}");
let file = std::fs::File::open(path)?;
let df = ParquetReader::new(file).finish().map_err(|e| {
let message = format!("Failed to read parquet file {path:?}: {e}");
RecorderError::with_source(Box::new(e), message)
})?;
all_dfs.push(df);
}
let lazy_frames: Vec<LazyFrame> = all_dfs.into_iter().map(|df| df.lazy()).collect();
let merged_df = concat_lf_diagonal(&lazy_frames, UnionArgs::default())
.map_err(|e| {
let message = format!("Failed to concat DataFrames: {e}");
RecorderError::with_source(Box::new(e), message)
})?
.sort(
["publish_at_timestamp"],
SortMultipleOptions::default().with_order_descending(true),
)
.unique(
Some(vec![
"mikan_fansub_id".to_string(),
"mikan_episode_id".to_string(),
]),
UniqueKeepStrategy::First,
)
.collect()
.map_err(|e| {
let message = format!("Failed to collect lazy DataFrame: {e}");
RecorderError::with_source(Box::new(e), message)
})?;
fn select_columns_and_write(
merged_df: DataFrame,
name: &str,
columns: &[&str],
) -> RecorderResult<()> {
let result_df = merged_df
.lazy()
.sort(["publish_at_timestamp"], SortMultipleOptions::default())
.select(columns.iter().map(|c| col(*c)).collect_vec())
.collect()
.map_err(|e| {
let message = format!("Failed to sort and select columns: {e}");
RecorderError::with_source(Box::new(e), message)
})?;
let output_path = TEST_FOLDER.join(format!("parquet/{name}.parquet"));
let mut output_file = std::fs::File::create(&output_path)?;
ParquetWriter::new(&mut output_file)
.set_parallel(true)
.with_compression(ParquetCompression::Zstd(Some(
ZstdLevel::try_new(22).unwrap(),
)))
.finish(&mut result_df.clone())
.map_err(|e| {
let message = format!("Failed to write merged parquet file: {e}");
RecorderError::with_source(Box::new(e), message)
})?;
println!("Merged {} rows into {output_path:?}", result_df.height());
Ok(())
}
select_columns_and_write(merged_df.clone(), "tiny", &["fansub_name", "original_name"])?;
select_columns_and_write(
merged_df.clone(),
"lite",
&[
"mikan_fansub_id",
"fansub_name",
"mikan_episode_id",
"original_name",
],
)?;
select_columns_and_write(
merged_df,
"full",
&[
"id",
"publish_at_timestamp",
"mikan_fansub_id",
"fansub_name",
"mikan_episode_id",
"original_name",
"magnet_link",
"file_size",
"torrent_link",
],
)?;
Ok(())
}
#[tokio::main] #[tokio::main]
async fn main() -> RecorderResult<()> { async fn main() -> RecorderResult<()> {
std::fs::create_dir_all(TEST_FOLDER.join("html"))?; std::fs::create_dir_all(TEST_FOLDER.join("html"))?;
@ -442,5 +573,12 @@ async fn main() -> RecorderResult<()> {
page.save_to_files()?; page.save_to_files()?;
} }
// 合并所有 parquet 文件
println!("\nMerging all parquet files...");
merge_mikan_classic_episodes_and_strip_columns().await?;
println!("Merge completed!");
Ok(()) Ok(())
} }