diff --git a/apps/recorder/.gitignore b/apps/recorder/.gitignore index 1b5bbe1..3d2eadd 100644 --- a/apps/recorder/.gitignore +++ b/apps/recorder/.gitignore @@ -27,3 +27,5 @@ node_modules dist/ temp/* !temp/.gitkeep +tests/resources/mikan/classic_episodes/*/* +!tests/resources/mikan/classic_episodes/parquet/tiny.parquet \ No newline at end of file diff --git a/apps/recorder/Cargo.toml b/apps/recorder/Cargo.toml index 4cb8713..a8a9b8e 100644 --- a/apps/recorder/Cargo.toml +++ b/apps/recorder/Cargo.toml @@ -154,7 +154,11 @@ icu = "2.0.0" tracing-tree = "0.4.0" num_cpus = "1.17.0" headers-accept = "0.1.4" -polars = { version = "0.49.1", features = ["parquet"], optional = true } +polars = { version = "0.49.1", features = [ + "parquet", + "lazy", + "diagonal_concat", +], optional = true } [dev-dependencies] inquire = { workspace = true } diff --git a/apps/recorder/examples/mikan_collect_classic_eps.rs b/apps/recorder/examples/mikan_collect_classic_eps.rs index 0a5fd54..257e797 100644 --- a/apps/recorder/examples/mikan_collect_classic_eps.rs +++ b/apps/recorder/examples/mikan_collect_classic_eps.rs @@ -2,6 +2,7 @@ use std::collections::HashSet; use chrono::{DateTime, Duration, FixedOffset, NaiveDate, NaiveTime, TimeZone, Utc}; use fetch::{HttpClientConfig, fetch_html}; +use itertools::Itertools; use lazy_static::lazy_static; use nom::{ IResult, Parser, @@ -398,6 +399,136 @@ async fn scrape_mikan_classic_episode_table_page_from_rev_id( scrape_mikan_classic_episode_table_page(mikan_client, page, Some((rev_idx, total))).await } +async fn merge_mikan_classic_episodes_and_strip_columns() -> RecorderResult<()> { + use polars::prelude::*; + + let dir = TEST_FOLDER.join("parquet"); + let files = std::fs::read_dir(dir)?; + + let parquet_paths = files + .filter_map(|f| f.ok()) + .filter_map(|f| { + let path = f.path(); + if let Some(ext) = path.extension() + && ext == "parquet" + && path + .file_stem() + .is_some_and(|f| f.to_string_lossy().starts_with("rev_")) + { + Some(path) + } else { + None + } + }) + .collect::>(); + + if parquet_paths.is_empty() { + return Err(RecorderError::without_source( + "No parquet files found to merge".into(), + )); + } + + println!("Found {} parquet files to merge", parquet_paths.len()); + + // 读取并合并所有 parquet 文件 + let mut all_dfs = Vec::new(); + for path in &parquet_paths { + println!("Reading {path:?}"); + let file = std::fs::File::open(path)?; + let df = ParquetReader::new(file).finish().map_err(|e| { + let message = format!("Failed to read parquet file {path:?}: {e}"); + RecorderError::with_source(Box::new(e), message) + })?; + all_dfs.push(df); + } + + let lazy_frames: Vec = all_dfs.into_iter().map(|df| df.lazy()).collect(); + + let merged_df = concat_lf_diagonal(&lazy_frames, UnionArgs::default()) + .map_err(|e| { + let message = format!("Failed to concat DataFrames: {e}"); + RecorderError::with_source(Box::new(e), message) + })? + .sort( + ["publish_at_timestamp"], + SortMultipleOptions::default().with_order_descending(true), + ) + .unique( + Some(vec![ + "mikan_fansub_id".to_string(), + "mikan_episode_id".to_string(), + ]), + UniqueKeepStrategy::First, + ) + .collect() + .map_err(|e| { + let message = format!("Failed to collect lazy DataFrame: {e}"); + RecorderError::with_source(Box::new(e), message) + })?; + + fn select_columns_and_write( + merged_df: DataFrame, + name: &str, + columns: &[&str], + ) -> RecorderResult<()> { + let result_df = merged_df + .lazy() + .sort(["publish_at_timestamp"], SortMultipleOptions::default()) + .select(columns.iter().map(|c| col(*c)).collect_vec()) + .collect() + .map_err(|e| { + let message = format!("Failed to sort and select columns: {e}"); + RecorderError::with_source(Box::new(e), message) + })?; + + let output_path = TEST_FOLDER.join(format!("parquet/{name}.parquet")); + let mut output_file = std::fs::File::create(&output_path)?; + + ParquetWriter::new(&mut output_file) + .set_parallel(true) + .with_compression(ParquetCompression::Zstd(Some( + ZstdLevel::try_new(22).unwrap(), + ))) + .finish(&mut result_df.clone()) + .map_err(|e| { + let message = format!("Failed to write merged parquet file: {e}"); + RecorderError::with_source(Box::new(e), message) + })?; + + println!("Merged {} rows into {output_path:?}", result_df.height()); + Ok(()) + } + + select_columns_and_write(merged_df.clone(), "tiny", &["fansub_name", "original_name"])?; + select_columns_and_write( + merged_df.clone(), + "lite", + &[ + "mikan_fansub_id", + "fansub_name", + "mikan_episode_id", + "original_name", + ], + )?; + select_columns_and_write( + merged_df, + "full", + &[ + "id", + "publish_at_timestamp", + "mikan_fansub_id", + "fansub_name", + "mikan_episode_id", + "original_name", + "magnet_link", + "file_size", + "torrent_link", + ], + )?; + + Ok(()) +} + #[tokio::main] async fn main() -> RecorderResult<()> { std::fs::create_dir_all(TEST_FOLDER.join("html"))?; @@ -442,5 +573,12 @@ async fn main() -> RecorderResult<()> { page.save_to_files()?; } + // 合并所有 parquet 文件 + println!("\nMerging all parquet files..."); + + merge_mikan_classic_episodes_and_strip_columns().await?; + + println!("Merge completed!"); + Ok(()) } diff --git a/apps/recorder/tests/resources/mikan/classic_episodes/parquet/tiny.parquet b/apps/recorder/tests/resources/mikan/classic_episodes/parquet/tiny.parquet new file mode 100644 index 0000000..c7a7a69 Binary files /dev/null and b/apps/recorder/tests/resources/mikan/classic_episodes/parquet/tiny.parquet differ