use std::{borrow::Cow, sync::Arc}; use async_stream::stream; use futures::{Stream, StreamExt, pin_mut}; use serde::{Serialize, de::DeserializeOwned}; use tokio::sync::{RwLock, mpsc}; use crate::{ app::AppContextTrait, errors::app_error::{RecorderError, RecorderResult}, models, }; pub struct TaskMeta { pub subscriber_id: i32, pub task_id: i32, pub task_kind: Cow<'static, str>, } pub struct ReplayChannel { sender: mpsc::UnboundedSender, channels: Arc>>>, buffer: Arc>>, } impl ReplayChannel { pub fn new(history: Vec) -> Self { let (tx, mut rx) = mpsc::unbounded_channel::(); let channels = Arc::new(RwLock::new(Vec::>::new())); let buffer = Arc::new(RwLock::new(history)); { let channels = channels.clone(); let buffer = buffer.clone(); tokio::spawn(async move { loop { match rx.recv().await { Some(value) => { let mut w = buffer.write().await; let senders = channels.read().await; for s in senders.iter() { if !s.is_closed() { if let Err(err) = s.send(value.clone()) { tracing::error!(err = %err, "replay-channel broadcast to other subscribers error"); } } } w.push(value); } None => { drop(rx); let mut cs = channels.write().await; cs.clear(); break; } } } }); } Self { sender: tx, channels, buffer, } } pub fn sender(&self) -> &mpsc::UnboundedSender { &self.sender } pub async fn receiver(&self) -> mpsc::UnboundedReceiver { let (tx, rx) = mpsc::unbounded_channel(); let items = self.buffer.read().await; for item in items.iter() { if let Err(err) = tx.send(item.clone()) { tracing::error!(err = %err, "replay-channel send replay value to other subscribers error"); } } if !self.sender.is_closed() { let mut sw = self.channels.write().await; sw.push(tx); } rx } pub async fn close(&self) { let mut senders = self.channels.write().await; senders.clear(); } } pub trait StreamTaskCoreTrait: Sized { type Request: Serialize + DeserializeOwned; type Item: Serialize + DeserializeOwned; fn task_id(&self) -> i32; fn task_kind(&self) -> &str; fn new(meta: TaskMeta, request: Self::Request) -> Self; fn request(&self) -> &Self::Request; } pub trait StreamTaskReplayLayoutTrait: StreamTaskCoreTrait { fn history(&self) -> &[Arc>]; fn resume_from_model( task: models::tasks::Model, stream_items: Vec, ) -> RecorderResult; fn running_receiver( &self, ) -> impl Future>>>>; #[allow(clippy::type_complexity)] fn init_receiver( &self, ) -> impl Future< Output = ( mpsc::UnboundedSender>>, mpsc::UnboundedReceiver>>, ), >; fn serialize_request(request: Self::Request) -> RecorderResult { serde_json::to_value(request).map_err(RecorderError::from) } fn serialize_item(item: RecorderResult) -> RecorderResult { serde_json::to_value(item).map_err(RecorderError::from) } fn deserialize_request(request: serde_json::Value) -> RecorderResult { serde_json::from_value(request).map_err(RecorderError::from) } fn deserialize_item(item: serde_json::Value) -> RecorderResult> { serde_json::from_value(item).map_err(RecorderError::from) } } pub trait StreamTaskRunnerTrait: StreamTaskCoreTrait { fn run( context: Arc, request: &Self::Request, history: &[Arc>], ) -> impl Stream>; } pub trait StreamTaskReplayRunnerTrait: StreamTaskRunnerTrait + StreamTaskReplayLayoutTrait { fn run_shared( &self, context: Arc, ) -> impl Stream>> { stream! { if let Some(mut receiver) = self.running_receiver().await { while let Some(item) = receiver.recv().await { yield item } } else { let (tx, _) = self.init_receiver().await; let stream = Self::run(context, self.request(), self.history()); pin_mut!(stream); while let Some(item) = stream.next().await { let item = Arc::new(item); if let Err(err) = tx.send(item.clone()) { tracing::error!(task_id = self.task_id(), task_kind = self.task_kind(), err = %err, "run shared send error"); } yield item } }; } } } pub struct StandardStreamTaskReplayLayout where Request: Serialize + DeserializeOwned, Item: Serialize + DeserializeOwned + Sync + Send + 'static, { pub meta: TaskMeta, pub request: Request, pub history: Vec>>, #[allow(clippy::type_complexity)] pub channel: Arc>>>>>, } impl StreamTaskCoreTrait for StandardStreamTaskReplayLayout where Request: Serialize + DeserializeOwned, Item: Serialize + DeserializeOwned + Sync + Send + 'static, { type Request = Request; type Item = Item; fn task_id(&self) -> i32 { self.meta.task_id } fn request(&self) -> &Self::Request { &self.request } fn task_kind(&self) -> &str { &self.meta.task_kind } fn new(meta: TaskMeta, request: Self::Request) -> Self { Self { meta, request, history: vec![], channel: Arc::new(RwLock::new(None)), } } } impl StreamTaskReplayLayoutTrait for StandardStreamTaskReplayLayout where Request: Serialize + DeserializeOwned, Item: Serialize + DeserializeOwned + Sync + Send + 'static, { fn history(&self) -> &[Arc>] { &self.history } fn resume_from_model( task: models::tasks::Model, stream_items: Vec, ) -> RecorderResult { Ok(Self { meta: TaskMeta { task_id: task.id, subscriber_id: task.subscriber_id, task_kind: Cow::Owned(task.task_type), }, request: Self::deserialize_request(task.request_data)?, history: stream_items .into_iter() .map(|m| Self::deserialize_item(m.item).map(Arc::new)) .collect::>>()?, channel: Arc::new(RwLock::new(None)), }) } async fn running_receiver( &self, ) -> Option>>> { if let Some(channel) = self.channel.read().await.as_ref() { Some(channel.receiver().await) } else { None } } async fn init_receiver( &self, ) -> ( mpsc::UnboundedSender>>, mpsc::UnboundedReceiver>>, ) { let channel = ReplayChannel::new(self.history.clone()); let rx = channel.receiver().await; let sender = channel.sender().clone(); { { let mut w = self.channel.write().await; *w = Some(channel); } } (sender, rx) } }