diff --git a/Cargo.lock b/Cargo.lock index e78b8a9..9b4915c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -575,6 +575,7 @@ dependencies = [ "axum-core", "bytes", "futures-util", + "headers", "http", "http-body", "http-body-util", @@ -2543,6 +2544,30 @@ dependencies = [ "hashbrown 0.15.4", ] +[[package]] +name = "headers" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3314d5adb5d94bcdf56771f2e50dbbc80bb4bdf88967526706205ac9eff24eb" +dependencies = [ + "base64 0.22.1", + "bytes", + "headers-core", + "http", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" +dependencies = [ + "http", +] + [[package]] name = "heck" version = "0.4.1" @@ -5236,6 +5261,7 @@ dependencies = [ "lightningcss", "log", "maplit", + "mime_guess", "mockito", "moka", "nanoid", diff --git a/Cargo.toml b/Cargo.toml index 68a9c2a..3d54ee7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,6 @@ util-derive = { path = "./packages/util-derive" } fetch = { path = "./packages/fetch" } downloader = { path = "./packages/downloader" } recorder = { path = "./apps/recorder" } -proxy = { path = "./apps/proxy" } reqwest = { version = "0.12.20", features = [ "charset", @@ -62,7 +61,7 @@ regex = "1.11" lazy_static = "1.5" axum = { version = "0.8.3", features = ["macros"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } -axum-extra = "0.10" +axum-extra = { version = "0.10", features = ["typed-header"] } mockito = { version = "1.6.1" } convert_case = "0.8" color-eyre = "0.6.5" diff --git a/apps/recorder/Cargo.toml b/apps/recorder/Cargo.toml index c55301b..8e847b4 100644 --- a/apps/recorder/Cargo.toml +++ b/apps/recorder/Cargo.toml @@ -128,6 +128,7 @@ reqwest_cookie_store = "0.8.0" nanoid = "0.4.0" jwtk = "0.4.0" percent-encoding = "2.3.1" +mime_guess = "2.0.5" [dev-dependencies] diff --git a/apps/recorder/src/app/context.rs b/apps/recorder/src/app/context.rs index 917c61b..66d0a88 100644 --- a/apps/recorder/src/app/context.rs +++ b/apps/recorder/src/app/context.rs @@ -4,17 +4,9 @@ use tokio::sync::OnceCell; use super::{Environment, config::AppConfig}; use crate::{ - auth::AuthService, - cache::CacheService, - crypto::CryptoService, - database::DatabaseService, - errors::RecorderResult, - extract::mikan::MikanClient, - graphql::GraphQLService, - logger::LoggerService, - message::MessageService, - storage::{StorageService, StorageServiceTrait}, - task::TaskService, + auth::AuthService, cache::CacheService, crypto::CryptoService, database::DatabaseService, + errors::RecorderResult, extract::mikan::MikanClient, graphql::GraphQLService, + logger::LoggerService, message::MessageService, storage::StorageService, task::TaskService, }; pub trait AppContextTrait: Send + Sync + Debug { @@ -25,7 +17,7 @@ pub trait AppContextTrait: Send + Sync + Debug { fn mikan(&self) -> &MikanClient; fn auth(&self) -> &AuthService; fn graphql(&self) -> &GraphQLService; - fn storage(&self) -> &dyn StorageServiceTrait; + fn storage(&self) -> &StorageService; fn working_dir(&self) -> &String; fn environment(&self) -> &Environment; fn crypto(&self) -> &CryptoService; @@ -126,7 +118,7 @@ impl AppContextTrait for AppContext { fn graphql(&self) -> &GraphQLService { self.graphql.get().expect("graphql should be set") } - fn storage(&self) -> &dyn StorageServiceTrait { + fn storage(&self) -> &StorageService { &self.storage } fn working_dir(&self) -> &String { diff --git a/apps/recorder/src/auth/errors.rs b/apps/recorder/src/auth/errors.rs index 04c9abd..156c0ec 100644 --- a/apps/recorder/src/auth/errors.rs +++ b/apps/recorder/src/auth/errors.rs @@ -11,13 +11,14 @@ use openidconnect::{ }; use serde::{Deserialize, Serialize}; use snafu::prelude::*; -use util::OptDynErr; use crate::models::auth::AuthType; #[derive(Debug, Snafu)] #[snafu(visibility(pub(crate)))] pub enum AuthError { + #[snafu(display("Permission denied"))] + PermissionError, #[snafu(display("Not support auth method"))] NotSupportAuthMethod { supported: Vec, @@ -93,12 +94,6 @@ pub enum AuthError { column: String, context_path: String, }, - #[snafu(display("GraphQL permission denied since {field}"))] - GraphqlStaticPermissionError { - #[snafu(source)] - source: OptDynErr, - field: String, - }, } impl AuthError { diff --git a/apps/recorder/src/errors/app_error.rs b/apps/recorder/src/errors/app_error.rs index 7561ee2..95e9ce9 100644 --- a/apps/recorder/src/errors/app_error.rs +++ b/apps/recorder/src/errors/app_error.rs @@ -5,8 +5,7 @@ use axum::{ response::{IntoResponse, Response}, }; use fetch::{FetchError, HttpClientError, reqwest, reqwest_middleware}; -use http::StatusCode; -use serde::{Deserialize, Deserializer, Serialize}; +use http::{HeaderMap, StatusCode}; use snafu::Snafu; use crate::{ @@ -19,6 +18,19 @@ use crate::{ #[derive(Snafu, Debug)] #[snafu(visibility(pub(crate)))] pub enum RecorderError { + #[snafu(display( + "HTTP {status} {reason}, source = {source:?}", + status = status, + reason = status.canonical_reason().unwrap_or("Unknown") + ))] + HttpResponseError { + status: StatusCode, + headers: Option, + #[snafu(source(from(Box, OptDynErr::some)))] + source: OptDynErr, + }, + #[snafu(transparent, context(false))] + HttpError { source: http::Error }, #[snafu(transparent, context(false))] FancyRegexError { #[snafu(source(from(fancy_regex::Error, Box::new)))] @@ -128,6 +140,22 @@ pub enum RecorderError { } impl RecorderError { + pub fn from_status(status: StatusCode) -> Self { + Self::HttpResponseError { + status, + headers: None, + source: None.into(), + } + } + + pub fn from_status_and_headers(status: StatusCode, headers: HeaderMap) -> Self { + Self::HttpResponseError { + status, + headers: Some(headers), + source: None.into(), + } + } + pub fn from_mikan_meta_missing_field(field: Cow<'static, str>) -> Self { Self::MikanMetaMissingFieldError { field, @@ -177,10 +205,47 @@ impl snafu::FromString for RecorderError { } } +impl From for RecorderError { + fn from(status: StatusCode) -> Self { + Self::HttpResponseError { + status, + headers: None, + source: None.into(), + } + } +} + +impl From<(StatusCode, HeaderMap)> for RecorderError { + fn from((status, headers): (StatusCode, HeaderMap)) -> Self { + Self::HttpResponseError { + status, + headers: Some(headers), + source: None.into(), + } + } +} + impl IntoResponse for RecorderError { fn into_response(self) -> Response { match self { Self::AuthError { source: auth_error } => auth_error.into_response(), + Self::HttpResponseError { + status, + headers, + source, + } => { + let message = Option::<_>::from(source) + .map(|s| s.to_string()) + .unwrap_or_else(|| { + String::from(status.canonical_reason().unwrap_or("Unknown")) + }); + ( + status, + headers, + Json::(StandardErrorResponse::from(message)), + ) + .into_response() + } err => ( StatusCode::INTERNAL_SERVER_ERROR, Json::(StandardErrorResponse::from(err.to_string())), @@ -190,28 +255,6 @@ impl IntoResponse for RecorderError { } } -impl Serialize for RecorderError { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - serializer.serialize_str(&self.to_string()) - } -} - -impl<'de> Deserialize<'de> for RecorderError { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let s = String::deserialize(deserializer)?; - Ok(Self::Whatever { - message: s, - source: None.into(), - }) - } -} - impl From for RecorderError { fn from(error: reqwest::Error) -> Self { FetchError::from(error).into() diff --git a/apps/recorder/src/extract/mikan/web.rs b/apps/recorder/src/extract/mikan/web.rs index 9c7d9e3..0660553 100644 --- a/apps/recorder/src/extract/mikan/web.rs +++ b/apps/recorder/src/extract/mikan/web.rs @@ -28,7 +28,7 @@ use crate::{ MIKAN_YEAR_QUERY_KEY, MikanClient, }, }, - storage::{StorageContentCategory, StorageServiceTrait}, + storage::{StorageContentCategory, StorageService}, }; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -739,18 +739,20 @@ pub async fn scrape_mikan_poster_data_from_image_url( #[instrument(skip_all, fields(origin_poster_src_url = origin_poster_src_url.as_str()))] pub async fn scrape_mikan_poster_meta_from_image_url( mikan_client: &MikanClient, - storage_service: &dyn StorageServiceTrait, + storage_service: &StorageService, origin_poster_src_url: Url, subscriber_id: i32, ) -> RecorderResult { if let Some(poster_src) = storage_service - .exists_object( - StorageContentCategory::Image, - subscriber_id, - Some(MIKAN_POSTER_BUCKET_KEY), - &origin_poster_src_url - .path() - .replace(&format!("{MIKAN_BANGUMI_POSTER_PATH}/"), ""), + .exists( + storage_service.build_subscriber_object_path( + StorageContentCategory::Image, + subscriber_id, + MIKAN_POSTER_BUCKET_KEY, + &origin_poster_src_url + .path() + .replace(&format!("{MIKAN_BANGUMI_POSTER_PATH}/"), ""), + ), ) .await? { @@ -765,13 +767,15 @@ pub async fn scrape_mikan_poster_meta_from_image_url( .await?; let poster_str = storage_service - .store_object( - StorageContentCategory::Image, - subscriber_id, - Some(MIKAN_POSTER_BUCKET_KEY), - &origin_poster_src_url - .path() - .replace(&format!("{MIKAN_BANGUMI_POSTER_PATH}/"), ""), + .write( + storage_service.build_subscriber_object_path( + StorageContentCategory::Image, + subscriber_id, + MIKAN_POSTER_BUCKET_KEY, + &origin_poster_src_url + .path() + .replace(&format!("{MIKAN_BANGUMI_POSTER_PATH}/"), ""), + ), poster_data, ) .await?; @@ -1086,10 +1090,10 @@ mod test { resources_mock.shared_resource_mock.expect(1); - let storage_fullname = storage_service.get_fullname( + let storage_fullname = storage_service.build_subscriber_object_path( StorageContentCategory::Image, 1, - Some(MIKAN_POSTER_BUCKET_KEY), + MIKAN_POSTER_BUCKET_KEY, "202309/5ce9fed1.jpg", ); let storage_fullename_str = storage_fullname.as_str(); diff --git a/apps/recorder/src/storage/client.rs b/apps/recorder/src/storage/client.rs index f14ac90..b54ae37 100644 --- a/apps/recorder/src/storage/client.rs +++ b/apps/recorder/src/storage/client.rs @@ -1,8 +1,8 @@ use std::fmt; use bytes::Bytes; -use opendal::{Buffer, Operator, layers::LoggingLayer}; -use quirks_path::{Path, PathBuf}; +use opendal::{Buffer, Metadata, Operator, Reader, Writer, layers::LoggingLayer}; +use quirks_path::PathBuf; use serde::{Deserialize, Serialize}; use url::Url; @@ -43,88 +43,6 @@ impl fmt::Display for StorageStoredUrl { } } -#[async_trait::async_trait] -pub trait StorageServiceTrait: Sync { - fn get_operator(&self) -> RecorderResult; - - fn get_fullname( - &self, - content_category: StorageContentCategory, - subscriber_id: i32, - bucket: Option<&str>, - filename: &str, - ) -> PathBuf { - [ - &subscriber_id.to_string(), - content_category.as_ref(), - bucket.unwrap_or_default(), - filename, - ] - .into_iter() - .map(Path::new) - .collect::() - } - async fn store_object( - &self, - content_category: StorageContentCategory, - subscriber_id: i32, - bucket: Option<&str>, - filename: &str, - data: Bytes, - ) -> RecorderResult { - let fullname = self.get_fullname(content_category, subscriber_id, bucket, filename); - - let operator = self.get_operator()?; - - if let Some(dirname) = fullname.parent() { - let dirname = dirname.join("/"); - operator.create_dir(dirname.as_str()).await?; - } - - operator.write(fullname.as_str(), data).await?; - - Ok(StorageStoredUrl::RelativePath { - path: fullname.to_string(), - }) - } - - async fn exists_object( - &self, - content_category: StorageContentCategory, - subscriber_id: i32, - bucket: Option<&str>, - filename: &str, - ) -> RecorderResult> { - let fullname = self.get_fullname(content_category, subscriber_id, bucket, filename); - - let operator = self.get_operator()?; - - if operator.exists(fullname.as_str()).await? { - Ok(Some(StorageStoredUrl::RelativePath { - path: fullname.to_string(), - })) - } else { - Ok(None) - } - } - - async fn load_object( - &self, - content_category: StorageContentCategory, - subscriber_id: i32, - bucket: Option<&str>, - filename: &str, - ) -> RecorderResult { - let fullname = self.get_fullname(content_category, subscriber_id, bucket, filename); - - let operator = self.get_operator()?; - - let data = operator.read(fullname.as_str()).await?; - - Ok(data) - } -} - #[derive(Debug, Clone)] pub struct StorageService { pub data_dir: String, @@ -136,15 +54,106 @@ impl StorageService { data_dir: config.data_dir.to_string(), }) } -} -#[async_trait::async_trait] -impl StorageServiceTrait for StorageService { - fn get_operator(&self) -> RecorderResult { - let fs_op = Operator::new(opendal::services::Fs::default().root(&self.data_dir))? - .layer(LoggingLayer::default()) - .finish(); + pub fn get_operator(&self) -> Result { + let op = if cfg!(test) { + Operator::new(opendal::services::Memory::default())? + .layer(LoggingLayer::default()) + .finish() + } else { + Operator::new(opendal::services::Fs::default().root(&self.data_dir))? + .layer(LoggingLayer::default()) + .finish() + }; - Ok(fs_op) + Ok(op) + } + + pub fn build_subscriber_path(&self, subscriber_id: i32, path: &str) -> PathBuf { + let mut p = PathBuf::from("/subscribers"); + p.push(subscriber_id.to_string()); + p.push(path); + p + } + + pub fn build_subscriber_object_path( + &self, + content_category: StorageContentCategory, + subscriber_id: i32, + bucket: &str, + object_name: &str, + ) -> PathBuf { + self.build_subscriber_path( + subscriber_id, + &format!("{}/{}/{}", content_category.as_ref(), bucket, object_name), + ) + } + + pub async fn write + Send>( + &self, + path: P, + data: Bytes, + ) -> Result { + let operator = self.get_operator()?; + + let path = path.into(); + + if let Some(dirname) = path.parent() { + let dirname = dirname.join("/"); + operator.create_dir(dirname.as_str()).await?; + } + + operator.write(path.as_str(), data).await?; + + Ok(StorageStoredUrl::RelativePath { + path: path.to_string(), + }) + } + + pub async fn exists( + &self, + path: P, + ) -> Result, opendal::Error> { + let operator = self.get_operator()?; + + let path = path.to_string(); + + if operator.exists(&path).await? { + Ok(Some(StorageStoredUrl::RelativePath { path })) + } else { + Ok(None) + } + } + + pub async fn read(&self, path: impl AsRef) -> Result { + let operator = self.get_operator()?; + + let data = operator.read(path.as_ref()).await?; + + Ok(data) + } + + pub async fn reader(&self, path: impl AsRef) -> Result { + let operator = self.get_operator()?; + + let reader = operator.reader(path.as_ref()).await?; + + Ok(reader) + } + + pub async fn writer(&self, path: impl AsRef) -> Result { + let operator = self.get_operator()?; + + let writer = operator.writer(path.as_ref()).await?; + + Ok(writer) + } + + pub async fn stat(&self, path: impl AsRef) -> Result { + let operator = self.get_operator()?; + + let metadata = operator.stat(path.as_ref()).await?; + + Ok(metadata) } } diff --git a/apps/recorder/src/storage/mod.rs b/apps/recorder/src/storage/mod.rs index 8edde09..5aafd8e 100644 --- a/apps/recorder/src/storage/mod.rs +++ b/apps/recorder/src/storage/mod.rs @@ -1,4 +1,4 @@ mod client; mod config; -pub use client::{StorageContentCategory, StorageService, StorageServiceTrait, StorageStoredUrl}; +pub use client::{StorageContentCategory, StorageService, StorageStoredUrl}; pub use config::StorageConfig; diff --git a/apps/recorder/src/test_utils/app.rs b/apps/recorder/src/test_utils/app.rs index dddc030..70bb31a 100644 --- a/apps/recorder/src/test_utils/app.rs +++ b/apps/recorder/src/test_utils/app.rs @@ -3,7 +3,7 @@ use std::{fmt::Debug, sync::Arc}; use once_cell::sync::OnceCell; use typed_builder::TypedBuilder; -use crate::{app::AppContextTrait, test_utils::storage::TestingStorageService}; +use crate::app::AppContextTrait; #[derive(TypedBuilder)] #[builder(field_defaults(default, setter(strip_option)))] @@ -15,7 +15,7 @@ pub struct TestingAppContext { mikan: Option, auth: Option, graphql: Option, - storage: Option, + storage: Option, crypto: Option, #[builder(default = Arc::new(OnceCell::new()), setter(!strip_option))] task: Arc>, @@ -67,7 +67,7 @@ impl AppContextTrait for TestingAppContext { self.graphql.as_ref().expect("should set graphql") } - fn storage(&self) -> &dyn crate::storage::StorageServiceTrait { + fn storage(&self) -> &crate::storage::StorageService { self.storage.as_ref().expect("should set storage") } diff --git a/apps/recorder/src/test_utils/storage.rs b/apps/recorder/src/test_utils/storage.rs index b07968b..e4abf41 100644 --- a/apps/recorder/src/test_utils/storage.rs +++ b/apps/recorder/src/test_utils/storage.rs @@ -1,28 +1,13 @@ -use opendal::{Operator, layers::LoggingLayer}; +use crate::{ + errors::RecorderResult, + storage::{StorageConfig, StorageService}, +}; -use crate::{errors::RecorderResult, storage::StorageServiceTrait}; +pub async fn build_testing_storage_service() -> RecorderResult { + let service = StorageService::from_config(StorageConfig { + data_dir: "tests/data".to_string(), + }) + .await?; -pub struct TestingStorageService { - operator: Operator, -} - -impl TestingStorageService { - pub fn new() -> RecorderResult { - let op = Operator::new(opendal::services::Memory::default())? - .layer(LoggingLayer::default()) - .finish(); - - Ok(Self { operator: op }) - } -} - -#[async_trait::async_trait] -impl StorageServiceTrait for TestingStorageService { - fn get_operator(&self) -> RecorderResult { - Ok(self.operator.clone()) - } -} - -pub async fn build_testing_storage_service() -> RecorderResult { - TestingStorageService::new() + Ok(service) } diff --git a/apps/recorder/src/utils/http.rs b/apps/recorder/src/utils/http.rs new file mode 100644 index 0000000..2318147 --- /dev/null +++ b/apps/recorder/src/utils/http.rs @@ -0,0 +1,18 @@ +use std::ops::Bound; + +pub fn bound_range_to_content_range( + r: &(Bound, Bound), + l: u64, +) -> Result { + match r { + (Bound::Included(start), Bound::Included(end)) => Ok(format!("bytes {start}-{end}/{l}")), + (Bound::Included(start), Bound::Excluded(end)) => { + Ok(format!("bytes {start}-{}/{l}", end - 1)) + } + (Bound::Included(start), Bound::Unbounded) => Ok(format!( + "bytes {start}-{}/{l}", + if l > 0 { l - 1 } else { 0 } + )), + _ => Err(format!("bytes */{l}")), + } +} diff --git a/apps/recorder/src/utils/mod.rs b/apps/recorder/src/utils/mod.rs index 22fdbb3..22eaf2f 100644 --- a/apps/recorder/src/utils/mod.rs +++ b/apps/recorder/src/utils/mod.rs @@ -1 +1,2 @@ +pub mod http; pub mod json; diff --git a/apps/recorder/src/web/controller/mod.rs b/apps/recorder/src/web/controller/mod.rs index ff42cfc..0508dde 100644 --- a/apps/recorder/src/web/controller/mod.rs +++ b/apps/recorder/src/web/controller/mod.rs @@ -2,5 +2,6 @@ pub mod core; pub mod graphql; pub mod metadata; pub mod oidc; +pub mod r#static; pub use core::{Controller, ControllerTrait, PrefixController}; diff --git a/apps/recorder/src/web/controller/static/mod.rs b/apps/recorder/src/web/controller/static/mod.rs new file mode 100644 index 0000000..2ad4522 --- /dev/null +++ b/apps/recorder/src/web/controller/static/mod.rs @@ -0,0 +1,145 @@ +use std::sync::Arc; + +use async_stream::try_stream; +use axum::{ + Extension, Router, + body::Body, + extract::{Path, State}, + middleware::from_fn_with_state, + response::Response, + routing::get, +}; +use axum_extra::{TypedHeader, headers::Range}; +use bytes::Bytes; +use futures::{Stream, StreamExt}; +use http::{HeaderMap, HeaderValue, StatusCode, header}; +use itertools::Itertools; +use uuid::Uuid; + +use crate::{ + app::AppContextTrait, + auth::{AuthError, AuthUserInfo, auth_middleware}, + errors::{RecorderError, RecorderResult}, + utils::http::bound_range_to_content_range, + web::controller::Controller, +}; + +pub const CONTROLLER_PREFIX: &str = "/api/static"; + +async fn serve_file_with_cache( + State(ctx): State>, + Path((subscriber_id, path)): Path<(i32, String)>, + Extension(auth_user_info): Extension, + range: Option>, +) -> RecorderResult { + if subscriber_id != auth_user_info.subscriber_auth.id { + Err(AuthError::PermissionError)?; + } + + let storage = ctx.storage(); + + let storage_path = storage.build_subscriber_path(subscriber_id, &path); + + let metadata = storage + .stat(&storage_path) + .await + .map_err(|_| RecorderError::from_status(StatusCode::NOT_FOUND))?; + + if !metadata.is_file() { + return Err(RecorderError::from_status(StatusCode::NOT_FOUND)); + } + + let mime_type = mime_guess::from_path(&path).first_or_octet_stream(); + + let response = if let Some(TypedHeader(range)) = range { + let ranges = range + .satisfiable_ranges(metadata.content_length()) + .collect_vec(); + + if ranges.is_empty() { + Response::builder() + .status(StatusCode::PARTIAL_CONTENT) + .header(header::CONTENT_TYPE, mime_type.as_ref()) + .body(Body::empty())? + } else if ranges.len() == 1 { + let r = ranges[0]; + let reader = storage.reader(&storage_path).await?; + let content_range = bound_range_to_content_range(&r, metadata.content_length()) + .map_err(|s| { + RecorderError::from_status_and_headers( + StatusCode::RANGE_NOT_SATISFIABLE, + HeaderMap::from_iter( + [(header::CONTENT_RANGE, HeaderValue::from_str(&s).unwrap())] + .into_iter(), + ), + ) + })?; + let stream = reader.into_bytes_stream(r).await?; + + Response::builder() + .status(StatusCode::PARTIAL_CONTENT) + .header(header::CONTENT_TYPE, mime_type.as_ref()) + .header(header::CONTENT_RANGE, content_range) + .body(Body::from_stream(stream))? + } else { + let boundary = Uuid::new_v4().to_string(); + let reader = storage.reader(&storage_path).await?; + let stream: impl Stream> = { + let boundary = boundary.clone(); + try_stream! { + for r in ranges { + let content_range = bound_range_to_content_range(&r, metadata.content_length()) + .map_err(|s| { + RecorderError::from_status_and_headers( + StatusCode::RANGE_NOT_SATISFIABLE, + HeaderMap::from_iter([(header::CONTENT_RANGE, HeaderValue::from_str(&s).unwrap())].into_iter()), + ) + })?; + let part_header = format!("--{boundary}\r\nContent-Type: {}\r\nContent-Range: {}\r\n\r\n", + mime_type.as_ref(), + content_range, + ); + yield part_header.into(); + let mut part_stream = reader.clone().into_bytes_stream(r).await?; + while let Some(chunk) = part_stream.next().await { + yield chunk?; + } + yield "\r\n".into(); + } + yield format!("--{boundary}--").into(); + } + }; + let body = Body::from_stream(stream); + + Response::builder() + .status(StatusCode::PARTIAL_CONTENT) + .header( + header::CONTENT_TYPE, + HeaderValue::from_str( + format!("multipart/byteranges; boundary={boundary}").as_str(), + ) + .unwrap(), + ) + .body(body)? + } + } else { + let reader = storage.reader(&storage_path).await?; + let stream = reader.into_bytes_stream(..).await?; + + Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, mime_type.as_ref()) + .body(Body::from_stream(stream))? + }; + + Ok(response) +} + +pub async fn create(ctx: Arc) -> RecorderResult { + let router = Router::>::new().route( + "/subscribers/{subscriber_id}/*path", + get(serve_file_with_cache).layer(from_fn_with_state(ctx, auth_middleware)), + ); + + Ok(Controller::from_prefix(CONTROLLER_PREFIX, router)) +}