fix: fix issues

This commit is contained in:
master 2025-06-17 02:23:02 +08:00
parent 721eee9c88
commit 35312ea1ff
15 changed files with 404 additions and 185 deletions

26
Cargo.lock generated
View File

@ -575,6 +575,7 @@ dependencies = [
"axum-core", "axum-core",
"bytes", "bytes",
"futures-util", "futures-util",
"headers",
"http", "http",
"http-body", "http-body",
"http-body-util", "http-body-util",
@ -2543,6 +2544,30 @@ dependencies = [
"hashbrown 0.15.4", "hashbrown 0.15.4",
] ]
[[package]]
name = "headers"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3314d5adb5d94bcdf56771f2e50dbbc80bb4bdf88967526706205ac9eff24eb"
dependencies = [
"base64 0.22.1",
"bytes",
"headers-core",
"http",
"httpdate",
"mime",
"sha1",
]
[[package]]
name = "headers-core"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4"
dependencies = [
"http",
]
[[package]] [[package]]
name = "heck" name = "heck"
version = "0.4.1" version = "0.4.1"
@ -5236,6 +5261,7 @@ dependencies = [
"lightningcss", "lightningcss",
"log", "log",
"maplit", "maplit",
"mime_guess",
"mockito", "mockito",
"moka", "moka",
"nanoid", "nanoid",

View File

@ -26,7 +26,6 @@ util-derive = { path = "./packages/util-derive" }
fetch = { path = "./packages/fetch" } fetch = { path = "./packages/fetch" }
downloader = { path = "./packages/downloader" } downloader = { path = "./packages/downloader" }
recorder = { path = "./apps/recorder" } recorder = { path = "./apps/recorder" }
proxy = { path = "./apps/proxy" }
reqwest = { version = "0.12.20", features = [ reqwest = { version = "0.12.20", features = [
"charset", "charset",
@ -62,7 +61,7 @@ regex = "1.11"
lazy_static = "1.5" lazy_static = "1.5"
axum = { version = "0.8.3", features = ["macros"] } axum = { version = "0.8.3", features = ["macros"] }
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
axum-extra = "0.10" axum-extra = { version = "0.10", features = ["typed-header"] }
mockito = { version = "1.6.1" } mockito = { version = "1.6.1" }
convert_case = "0.8" convert_case = "0.8"
color-eyre = "0.6.5" color-eyre = "0.6.5"

View File

@ -128,6 +128,7 @@ reqwest_cookie_store = "0.8.0"
nanoid = "0.4.0" nanoid = "0.4.0"
jwtk = "0.4.0" jwtk = "0.4.0"
percent-encoding = "2.3.1" percent-encoding = "2.3.1"
mime_guess = "2.0.5"
[dev-dependencies] [dev-dependencies]

View File

@ -4,17 +4,9 @@ use tokio::sync::OnceCell;
use super::{Environment, config::AppConfig}; use super::{Environment, config::AppConfig};
use crate::{ use crate::{
auth::AuthService, auth::AuthService, cache::CacheService, crypto::CryptoService, database::DatabaseService,
cache::CacheService, errors::RecorderResult, extract::mikan::MikanClient, graphql::GraphQLService,
crypto::CryptoService, logger::LoggerService, message::MessageService, storage::StorageService, task::TaskService,
database::DatabaseService,
errors::RecorderResult,
extract::mikan::MikanClient,
graphql::GraphQLService,
logger::LoggerService,
message::MessageService,
storage::{StorageService, StorageServiceTrait},
task::TaskService,
}; };
pub trait AppContextTrait: Send + Sync + Debug { pub trait AppContextTrait: Send + Sync + Debug {
@ -25,7 +17,7 @@ pub trait AppContextTrait: Send + Sync + Debug {
fn mikan(&self) -> &MikanClient; fn mikan(&self) -> &MikanClient;
fn auth(&self) -> &AuthService; fn auth(&self) -> &AuthService;
fn graphql(&self) -> &GraphQLService; fn graphql(&self) -> &GraphQLService;
fn storage(&self) -> &dyn StorageServiceTrait; fn storage(&self) -> &StorageService;
fn working_dir(&self) -> &String; fn working_dir(&self) -> &String;
fn environment(&self) -> &Environment; fn environment(&self) -> &Environment;
fn crypto(&self) -> &CryptoService; fn crypto(&self) -> &CryptoService;
@ -126,7 +118,7 @@ impl AppContextTrait for AppContext {
fn graphql(&self) -> &GraphQLService { fn graphql(&self) -> &GraphQLService {
self.graphql.get().expect("graphql should be set") self.graphql.get().expect("graphql should be set")
} }
fn storage(&self) -> &dyn StorageServiceTrait { fn storage(&self) -> &StorageService {
&self.storage &self.storage
} }
fn working_dir(&self) -> &String { fn working_dir(&self) -> &String {

View File

@ -11,13 +11,14 @@ use openidconnect::{
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use snafu::prelude::*; use snafu::prelude::*;
use util::OptDynErr;
use crate::models::auth::AuthType; use crate::models::auth::AuthType;
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
#[snafu(visibility(pub(crate)))] #[snafu(visibility(pub(crate)))]
pub enum AuthError { pub enum AuthError {
#[snafu(display("Permission denied"))]
PermissionError,
#[snafu(display("Not support auth method"))] #[snafu(display("Not support auth method"))]
NotSupportAuthMethod { NotSupportAuthMethod {
supported: Vec<AuthType>, supported: Vec<AuthType>,
@ -93,12 +94,6 @@ pub enum AuthError {
column: String, column: String,
context_path: String, context_path: String,
}, },
#[snafu(display("GraphQL permission denied since {field}"))]
GraphqlStaticPermissionError {
#[snafu(source)]
source: OptDynErr,
field: String,
},
} }
impl AuthError { impl AuthError {

View File

@ -5,8 +5,7 @@ use axum::{
response::{IntoResponse, Response}, response::{IntoResponse, Response},
}; };
use fetch::{FetchError, HttpClientError, reqwest, reqwest_middleware}; use fetch::{FetchError, HttpClientError, reqwest, reqwest_middleware};
use http::StatusCode; use http::{HeaderMap, StatusCode};
use serde::{Deserialize, Deserializer, Serialize};
use snafu::Snafu; use snafu::Snafu;
use crate::{ use crate::{
@ -19,6 +18,19 @@ use crate::{
#[derive(Snafu, Debug)] #[derive(Snafu, Debug)]
#[snafu(visibility(pub(crate)))] #[snafu(visibility(pub(crate)))]
pub enum RecorderError { pub enum RecorderError {
#[snafu(display(
"HTTP {status} {reason}, source = {source:?}",
status = status,
reason = status.canonical_reason().unwrap_or("Unknown")
))]
HttpResponseError {
status: StatusCode,
headers: Option<HeaderMap>,
#[snafu(source(from(Box<dyn std::error::Error + Send + Sync>, OptDynErr::some)))]
source: OptDynErr,
},
#[snafu(transparent, context(false))]
HttpError { source: http::Error },
#[snafu(transparent, context(false))] #[snafu(transparent, context(false))]
FancyRegexError { FancyRegexError {
#[snafu(source(from(fancy_regex::Error, Box::new)))] #[snafu(source(from(fancy_regex::Error, Box::new)))]
@ -128,6 +140,22 @@ pub enum RecorderError {
} }
impl RecorderError { impl RecorderError {
pub fn from_status(status: StatusCode) -> Self {
Self::HttpResponseError {
status,
headers: None,
source: None.into(),
}
}
pub fn from_status_and_headers(status: StatusCode, headers: HeaderMap) -> Self {
Self::HttpResponseError {
status,
headers: Some(headers),
source: None.into(),
}
}
pub fn from_mikan_meta_missing_field(field: Cow<'static, str>) -> Self { pub fn from_mikan_meta_missing_field(field: Cow<'static, str>) -> Self {
Self::MikanMetaMissingFieldError { Self::MikanMetaMissingFieldError {
field, field,
@ -177,10 +205,47 @@ impl snafu::FromString for RecorderError {
} }
} }
impl From<StatusCode> for RecorderError {
fn from(status: StatusCode) -> Self {
Self::HttpResponseError {
status,
headers: None,
source: None.into(),
}
}
}
impl From<(StatusCode, HeaderMap)> for RecorderError {
fn from((status, headers): (StatusCode, HeaderMap)) -> Self {
Self::HttpResponseError {
status,
headers: Some(headers),
source: None.into(),
}
}
}
impl IntoResponse for RecorderError { impl IntoResponse for RecorderError {
fn into_response(self) -> Response { fn into_response(self) -> Response {
match self { match self {
Self::AuthError { source: auth_error } => auth_error.into_response(), Self::AuthError { source: auth_error } => auth_error.into_response(),
Self::HttpResponseError {
status,
headers,
source,
} => {
let message = Option::<_>::from(source)
.map(|s| s.to_string())
.unwrap_or_else(|| {
String::from(status.canonical_reason().unwrap_or("Unknown"))
});
(
status,
headers,
Json::<StandardErrorResponse>(StandardErrorResponse::from(message)),
)
.into_response()
}
err => ( err => (
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
Json::<StandardErrorResponse>(StandardErrorResponse::from(err.to_string())), Json::<StandardErrorResponse>(StandardErrorResponse::from(err.to_string())),
@ -190,28 +255,6 @@ impl IntoResponse for RecorderError {
} }
} }
impl Serialize for RecorderError {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&self.to_string())
}
}
impl<'de> Deserialize<'de> for RecorderError {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
Ok(Self::Whatever {
message: s,
source: None.into(),
})
}
}
impl From<reqwest::Error> for RecorderError { impl From<reqwest::Error> for RecorderError {
fn from(error: reqwest::Error) -> Self { fn from(error: reqwest::Error) -> Self {
FetchError::from(error).into() FetchError::from(error).into()

View File

@ -28,7 +28,7 @@ use crate::{
MIKAN_YEAR_QUERY_KEY, MikanClient, MIKAN_YEAR_QUERY_KEY, MikanClient,
}, },
}, },
storage::{StorageContentCategory, StorageServiceTrait}, storage::{StorageContentCategory, StorageService},
}; };
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@ -739,18 +739,20 @@ pub async fn scrape_mikan_poster_data_from_image_url(
#[instrument(skip_all, fields(origin_poster_src_url = origin_poster_src_url.as_str()))] #[instrument(skip_all, fields(origin_poster_src_url = origin_poster_src_url.as_str()))]
pub async fn scrape_mikan_poster_meta_from_image_url( pub async fn scrape_mikan_poster_meta_from_image_url(
mikan_client: &MikanClient, mikan_client: &MikanClient,
storage_service: &dyn StorageServiceTrait, storage_service: &StorageService,
origin_poster_src_url: Url, origin_poster_src_url: Url,
subscriber_id: i32, subscriber_id: i32,
) -> RecorderResult<MikanBangumiPosterMeta> { ) -> RecorderResult<MikanBangumiPosterMeta> {
if let Some(poster_src) = storage_service if let Some(poster_src) = storage_service
.exists_object( .exists(
storage_service.build_subscriber_object_path(
StorageContentCategory::Image, StorageContentCategory::Image,
subscriber_id, subscriber_id,
Some(MIKAN_POSTER_BUCKET_KEY), MIKAN_POSTER_BUCKET_KEY,
&origin_poster_src_url &origin_poster_src_url
.path() .path()
.replace(&format!("{MIKAN_BANGUMI_POSTER_PATH}/"), ""), .replace(&format!("{MIKAN_BANGUMI_POSTER_PATH}/"), ""),
),
) )
.await? .await?
{ {
@ -765,13 +767,15 @@ pub async fn scrape_mikan_poster_meta_from_image_url(
.await?; .await?;
let poster_str = storage_service let poster_str = storage_service
.store_object( .write(
storage_service.build_subscriber_object_path(
StorageContentCategory::Image, StorageContentCategory::Image,
subscriber_id, subscriber_id,
Some(MIKAN_POSTER_BUCKET_KEY), MIKAN_POSTER_BUCKET_KEY,
&origin_poster_src_url &origin_poster_src_url
.path() .path()
.replace(&format!("{MIKAN_BANGUMI_POSTER_PATH}/"), ""), .replace(&format!("{MIKAN_BANGUMI_POSTER_PATH}/"), ""),
),
poster_data, poster_data,
) )
.await?; .await?;
@ -1086,10 +1090,10 @@ mod test {
resources_mock.shared_resource_mock.expect(1); resources_mock.shared_resource_mock.expect(1);
let storage_fullname = storage_service.get_fullname( let storage_fullname = storage_service.build_subscriber_object_path(
StorageContentCategory::Image, StorageContentCategory::Image,
1, 1,
Some(MIKAN_POSTER_BUCKET_KEY), MIKAN_POSTER_BUCKET_KEY,
"202309/5ce9fed1.jpg", "202309/5ce9fed1.jpg",
); );
let storage_fullename_str = storage_fullname.as_str(); let storage_fullename_str = storage_fullname.as_str();

View File

@ -1,8 +1,8 @@
use std::fmt; use std::fmt;
use bytes::Bytes; use bytes::Bytes;
use opendal::{Buffer, Operator, layers::LoggingLayer}; use opendal::{Buffer, Metadata, Operator, Reader, Writer, layers::LoggingLayer};
use quirks_path::{Path, PathBuf}; use quirks_path::PathBuf;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
@ -43,88 +43,6 @@ impl fmt::Display for StorageStoredUrl {
} }
} }
#[async_trait::async_trait]
pub trait StorageServiceTrait: Sync {
fn get_operator(&self) -> RecorderResult<Operator>;
fn get_fullname(
&self,
content_category: StorageContentCategory,
subscriber_id: i32,
bucket: Option<&str>,
filename: &str,
) -> PathBuf {
[
&subscriber_id.to_string(),
content_category.as_ref(),
bucket.unwrap_or_default(),
filename,
]
.into_iter()
.map(Path::new)
.collect::<PathBuf>()
}
async fn store_object(
&self,
content_category: StorageContentCategory,
subscriber_id: i32,
bucket: Option<&str>,
filename: &str,
data: Bytes,
) -> RecorderResult<StorageStoredUrl> {
let fullname = self.get_fullname(content_category, subscriber_id, bucket, filename);
let operator = self.get_operator()?;
if let Some(dirname) = fullname.parent() {
let dirname = dirname.join("/");
operator.create_dir(dirname.as_str()).await?;
}
operator.write(fullname.as_str(), data).await?;
Ok(StorageStoredUrl::RelativePath {
path: fullname.to_string(),
})
}
async fn exists_object(
&self,
content_category: StorageContentCategory,
subscriber_id: i32,
bucket: Option<&str>,
filename: &str,
) -> RecorderResult<Option<StorageStoredUrl>> {
let fullname = self.get_fullname(content_category, subscriber_id, bucket, filename);
let operator = self.get_operator()?;
if operator.exists(fullname.as_str()).await? {
Ok(Some(StorageStoredUrl::RelativePath {
path: fullname.to_string(),
}))
} else {
Ok(None)
}
}
async fn load_object(
&self,
content_category: StorageContentCategory,
subscriber_id: i32,
bucket: Option<&str>,
filename: &str,
) -> RecorderResult<Buffer> {
let fullname = self.get_fullname(content_category, subscriber_id, bucket, filename);
let operator = self.get_operator()?;
let data = operator.read(fullname.as_str()).await?;
Ok(data)
}
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct StorageService { pub struct StorageService {
pub data_dir: String, pub data_dir: String,
@ -136,15 +54,106 @@ impl StorageService {
data_dir: config.data_dir.to_string(), data_dir: config.data_dir.to_string(),
}) })
} }
}
#[async_trait::async_trait] pub fn get_operator(&self) -> Result<Operator, opendal::Error> {
impl StorageServiceTrait for StorageService { let op = if cfg!(test) {
fn get_operator(&self) -> RecorderResult<Operator> { Operator::new(opendal::services::Memory::default())?
let fs_op = Operator::new(opendal::services::Fs::default().root(&self.data_dir))?
.layer(LoggingLayer::default()) .layer(LoggingLayer::default())
.finish(); .finish()
} else {
Operator::new(opendal::services::Fs::default().root(&self.data_dir))?
.layer(LoggingLayer::default())
.finish()
};
Ok(fs_op) Ok(op)
}
pub fn build_subscriber_path(&self, subscriber_id: i32, path: &str) -> PathBuf {
let mut p = PathBuf::from("/subscribers");
p.push(subscriber_id.to_string());
p.push(path);
p
}
pub fn build_subscriber_object_path(
&self,
content_category: StorageContentCategory,
subscriber_id: i32,
bucket: &str,
object_name: &str,
) -> PathBuf {
self.build_subscriber_path(
subscriber_id,
&format!("{}/{}/{}", content_category.as_ref(), bucket, object_name),
)
}
pub async fn write<P: Into<PathBuf> + Send>(
&self,
path: P,
data: Bytes,
) -> Result<StorageStoredUrl, opendal::Error> {
let operator = self.get_operator()?;
let path = path.into();
if let Some(dirname) = path.parent() {
let dirname = dirname.join("/");
operator.create_dir(dirname.as_str()).await?;
}
operator.write(path.as_str(), data).await?;
Ok(StorageStoredUrl::RelativePath {
path: path.to_string(),
})
}
pub async fn exists<P: ToString + Send>(
&self,
path: P,
) -> Result<Option<StorageStoredUrl>, opendal::Error> {
let operator = self.get_operator()?;
let path = path.to_string();
if operator.exists(&path).await? {
Ok(Some(StorageStoredUrl::RelativePath { path }))
} else {
Ok(None)
}
}
pub async fn read(&self, path: impl AsRef<str>) -> Result<Buffer, opendal::Error> {
let operator = self.get_operator()?;
let data = operator.read(path.as_ref()).await?;
Ok(data)
}
pub async fn reader(&self, path: impl AsRef<str>) -> Result<Reader, opendal::Error> {
let operator = self.get_operator()?;
let reader = operator.reader(path.as_ref()).await?;
Ok(reader)
}
pub async fn writer(&self, path: impl AsRef<str>) -> Result<Writer, opendal::Error> {
let operator = self.get_operator()?;
let writer = operator.writer(path.as_ref()).await?;
Ok(writer)
}
pub async fn stat(&self, path: impl AsRef<str>) -> Result<Metadata, opendal::Error> {
let operator = self.get_operator()?;
let metadata = operator.stat(path.as_ref()).await?;
Ok(metadata)
} }
} }

View File

@ -1,4 +1,4 @@
mod client; mod client;
mod config; mod config;
pub use client::{StorageContentCategory, StorageService, StorageServiceTrait, StorageStoredUrl}; pub use client::{StorageContentCategory, StorageService, StorageStoredUrl};
pub use config::StorageConfig; pub use config::StorageConfig;

View File

@ -3,7 +3,7 @@ use std::{fmt::Debug, sync::Arc};
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use typed_builder::TypedBuilder; use typed_builder::TypedBuilder;
use crate::{app::AppContextTrait, test_utils::storage::TestingStorageService}; use crate::app::AppContextTrait;
#[derive(TypedBuilder)] #[derive(TypedBuilder)]
#[builder(field_defaults(default, setter(strip_option)))] #[builder(field_defaults(default, setter(strip_option)))]
@ -15,7 +15,7 @@ pub struct TestingAppContext {
mikan: Option<crate::extract::mikan::MikanClient>, mikan: Option<crate::extract::mikan::MikanClient>,
auth: Option<crate::auth::AuthService>, auth: Option<crate::auth::AuthService>,
graphql: Option<crate::graphql::GraphQLService>, graphql: Option<crate::graphql::GraphQLService>,
storage: Option<TestingStorageService>, storage: Option<crate::storage::StorageService>,
crypto: Option<crate::crypto::CryptoService>, crypto: Option<crate::crypto::CryptoService>,
#[builder(default = Arc::new(OnceCell::new()), setter(!strip_option))] #[builder(default = Arc::new(OnceCell::new()), setter(!strip_option))]
task: Arc<OnceCell<crate::task::TaskService>>, task: Arc<OnceCell<crate::task::TaskService>>,
@ -67,7 +67,7 @@ impl AppContextTrait for TestingAppContext {
self.graphql.as_ref().expect("should set graphql") self.graphql.as_ref().expect("should set graphql")
} }
fn storage(&self) -> &dyn crate::storage::StorageServiceTrait { fn storage(&self) -> &crate::storage::StorageService {
self.storage.as_ref().expect("should set storage") self.storage.as_ref().expect("should set storage")
} }

View File

@ -1,28 +1,13 @@
use opendal::{Operator, layers::LoggingLayer}; use crate::{
errors::RecorderResult,
storage::{StorageConfig, StorageService},
};
use crate::{errors::RecorderResult, storage::StorageServiceTrait}; pub async fn build_testing_storage_service() -> RecorderResult<StorageService> {
let service = StorageService::from_config(StorageConfig {
data_dir: "tests/data".to_string(),
})
.await?;
pub struct TestingStorageService { Ok(service)
operator: Operator,
}
impl TestingStorageService {
pub fn new() -> RecorderResult<Self> {
let op = Operator::new(opendal::services::Memory::default())?
.layer(LoggingLayer::default())
.finish();
Ok(Self { operator: op })
}
}
#[async_trait::async_trait]
impl StorageServiceTrait for TestingStorageService {
fn get_operator(&self) -> RecorderResult<Operator> {
Ok(self.operator.clone())
}
}
pub async fn build_testing_storage_service() -> RecorderResult<TestingStorageService> {
TestingStorageService::new()
} }

View File

@ -0,0 +1,18 @@
use std::ops::Bound;
pub fn bound_range_to_content_range(
r: &(Bound<u64>, Bound<u64>),
l: u64,
) -> Result<String, String> {
match r {
(Bound::Included(start), Bound::Included(end)) => Ok(format!("bytes {start}-{end}/{l}")),
(Bound::Included(start), Bound::Excluded(end)) => {
Ok(format!("bytes {start}-{}/{l}", end - 1))
}
(Bound::Included(start), Bound::Unbounded) => Ok(format!(
"bytes {start}-{}/{l}",
if l > 0 { l - 1 } else { 0 }
)),
_ => Err(format!("bytes */{l}")),
}
}

View File

@ -1 +1,2 @@
pub mod http;
pub mod json; pub mod json;

View File

@ -2,5 +2,6 @@ pub mod core;
pub mod graphql; pub mod graphql;
pub mod metadata; pub mod metadata;
pub mod oidc; pub mod oidc;
pub mod r#static;
pub use core::{Controller, ControllerTrait, PrefixController}; pub use core::{Controller, ControllerTrait, PrefixController};

View File

@ -0,0 +1,145 @@
use std::sync::Arc;
use async_stream::try_stream;
use axum::{
Extension, Router,
body::Body,
extract::{Path, State},
middleware::from_fn_with_state,
response::Response,
routing::get,
};
use axum_extra::{TypedHeader, headers::Range};
use bytes::Bytes;
use futures::{Stream, StreamExt};
use http::{HeaderMap, HeaderValue, StatusCode, header};
use itertools::Itertools;
use uuid::Uuid;
use crate::{
app::AppContextTrait,
auth::{AuthError, AuthUserInfo, auth_middleware},
errors::{RecorderError, RecorderResult},
utils::http::bound_range_to_content_range,
web::controller::Controller,
};
pub const CONTROLLER_PREFIX: &str = "/api/static";
async fn serve_file_with_cache(
State(ctx): State<Arc<dyn AppContextTrait>>,
Path((subscriber_id, path)): Path<(i32, String)>,
Extension(auth_user_info): Extension<AuthUserInfo>,
range: Option<TypedHeader<Range>>,
) -> RecorderResult<Response> {
if subscriber_id != auth_user_info.subscriber_auth.id {
Err(AuthError::PermissionError)?;
}
let storage = ctx.storage();
let storage_path = storage.build_subscriber_path(subscriber_id, &path);
let metadata = storage
.stat(&storage_path)
.await
.map_err(|_| RecorderError::from_status(StatusCode::NOT_FOUND))?;
if !metadata.is_file() {
return Err(RecorderError::from_status(StatusCode::NOT_FOUND));
}
let mime_type = mime_guess::from_path(&path).first_or_octet_stream();
let response = if let Some(TypedHeader(range)) = range {
let ranges = range
.satisfiable_ranges(metadata.content_length())
.collect_vec();
if ranges.is_empty() {
Response::builder()
.status(StatusCode::PARTIAL_CONTENT)
.header(header::CONTENT_TYPE, mime_type.as_ref())
.body(Body::empty())?
} else if ranges.len() == 1 {
let r = ranges[0];
let reader = storage.reader(&storage_path).await?;
let content_range = bound_range_to_content_range(&r, metadata.content_length())
.map_err(|s| {
RecorderError::from_status_and_headers(
StatusCode::RANGE_NOT_SATISFIABLE,
HeaderMap::from_iter(
[(header::CONTENT_RANGE, HeaderValue::from_str(&s).unwrap())]
.into_iter(),
),
)
})?;
let stream = reader.into_bytes_stream(r).await?;
Response::builder()
.status(StatusCode::PARTIAL_CONTENT)
.header(header::CONTENT_TYPE, mime_type.as_ref())
.header(header::CONTENT_RANGE, content_range)
.body(Body::from_stream(stream))?
} else {
let boundary = Uuid::new_v4().to_string();
let reader = storage.reader(&storage_path).await?;
let stream: impl Stream<Item = Result<Bytes, RecorderError>> = {
let boundary = boundary.clone();
try_stream! {
for r in ranges {
let content_range = bound_range_to_content_range(&r, metadata.content_length())
.map_err(|s| {
RecorderError::from_status_and_headers(
StatusCode::RANGE_NOT_SATISFIABLE,
HeaderMap::from_iter([(header::CONTENT_RANGE, HeaderValue::from_str(&s).unwrap())].into_iter()),
)
})?;
let part_header = format!("--{boundary}\r\nContent-Type: {}\r\nContent-Range: {}\r\n\r\n",
mime_type.as_ref(),
content_range,
);
yield part_header.into();
let mut part_stream = reader.clone().into_bytes_stream(r).await?;
while let Some(chunk) = part_stream.next().await {
yield chunk?;
}
yield "\r\n".into();
}
yield format!("--{boundary}--").into();
}
};
let body = Body::from_stream(stream);
Response::builder()
.status(StatusCode::PARTIAL_CONTENT)
.header(
header::CONTENT_TYPE,
HeaderValue::from_str(
format!("multipart/byteranges; boundary={boundary}").as_str(),
)
.unwrap(),
)
.body(body)?
}
} else {
let reader = storage.reader(&storage_path).await?;
let stream = reader.into_bytes_stream(..).await?;
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, mime_type.as_ref())
.body(Body::from_stream(stream))?
};
Ok(response)
}
pub async fn create(ctx: Arc<dyn AppContextTrait>) -> RecorderResult<Controller> {
let router = Router::<Arc<dyn AppContextTrait>>::new().route(
"/subscribers/{subscriber_id}/*path",
get(serve_file_with_cache).layer(from_fn_with_state(ctx, auth_middleware)),
);
Ok(Controller::from_prefix(CONTROLLER_PREFIX, router))
}