use std::{fmt, error::Error, time::Duration, any::Any, panic::AssertUnwindSafe, collections::HashMap, sync::Arc}; use async_trait::async_trait; use futures::{stream::FuturesUnordered, StreamExt, FutureExt}; use log::{warn, error}; use serde::Deserialize; #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct Id(&'static str); impl Id { pub fn new(s: String) -> Self { Self(Box::leak(s.into_boxed_str())) } } impl fmt::Display for Id { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str(self.0) } } #[derive(Clone, Debug, Eq, PartialEq, Hash, Deserialize)] pub struct Link(String); impl From for Link where S: Into { fn from(value: S) -> Self { Self(value.into()) } } #[derive(Clone, Debug)] pub struct Message { pub origin: (Id, String), pub link: Link, pub author: String, pub suffix: Arc, pub content: String, pub avatar: Option, } pub type Sender = tokio::sync::broadcast::Sender; pub type Receiver = tokio::sync::broadcast::Receiver; pub type TaskResult = Result<(), Box>; #[async_trait] pub trait Task { async fn start(&self, origin: Id, tx: Sender, rc: Receiver) -> TaskResult; fn restart_timeout(&self) -> Option { Some(Duration::from_secs(15)) } } enum ExitStatus { Success, Error(Box), Panic(Box), } async fn start_task(id: Id, task: &dyn Task, tx: Sender, timeout: Duration) -> (Id, ExitStatus) { if !timeout.is_zero() { warn!("task '{id}': waiting {timeout:?} to start"); tokio::time::sleep(timeout).await; } warn!("task '{id}': starting"); let rx = tx.subscribe(); let future = AssertUnwindSafe(task.start(id, tx, rx)).catch_unwind(); let result = match future.await { Ok(Ok(_)) => ExitStatus::Success, Ok(Err(e)) => ExitStatus::Error(e), Err(e) => ExitStatus::Panic(e), }; (id, result) } pub async fn run_tasks(tasks: HashMap>) { let mut futures = FuturesUnordered::new(); let (tx, _) = tokio::sync::broadcast::channel(64); for (id, task) in &tasks { let id = Id::new(id.clone()); futures.push(start_task(id, task.as_ref(), tx.clone(), Duration::ZERO)); } while let Some((id, result)) = futures.next().await { let task = &tasks[id.0]; let dur = task.restart_timeout(); match &result { ExitStatus::Success => warn!("task '{id}' exited without error"), ExitStatus::Error(e) => warn!("task '{id}': exited with error: {e}"), ExitStatus::Panic(_) => error!("task '{id}': panicked"), } if let Some(dur) = dur { futures.push(start_task(id, task.as_ref(), tx.clone(), dur)); } } }