abridged/src/supervisor.rs

98 lines
2.5 KiB
Rust

use std::{fmt, error::Error, time::Duration, any::Any, panic::AssertUnwindSafe, collections::HashMap};
use async_trait::async_trait;
use futures::{stream::FuturesUnordered, StreamExt, FutureExt};
use log::{warn, error};
use serde::Deserialize;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct Id(&'static str);
impl Id {
pub fn new(s: String) -> Self {
Self(Box::leak(s.into_boxed_str()))
}
}
impl fmt::Display for Id {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.0)
}
}
#[derive(Clone, Debug, Eq, PartialEq, Hash, Deserialize)]
pub struct Link(String);
impl <S> From<S> for Link
where S: Into<String> {
fn from(value: S) -> Self {
Self(value.into())
}
}
#[derive(Clone, Debug)]
pub struct Message {
pub origin: (Id, String),
pub link: Link,
pub author: String,
pub content: String,
pub avatar: Option<String>,
}
pub type Sender = tokio::sync::broadcast::Sender<Message>;
pub type Receiver = tokio::sync::broadcast::Receiver<Message>;
pub type TaskResult = Result<(), Box<dyn Error>>;
#[async_trait]
pub trait Task {
async fn start(&self, origin: Id, tx: Sender, rc: Receiver) -> TaskResult;
fn restart_timeout(&self) -> Option<Duration> {
Some(Duration::from_secs(15))
}
}
enum ExitStatus {
Success,
Error(Box<dyn Error>),
Panic(Box<dyn Any + 'static>),
}
async fn start_task(id: Id, task: &dyn Task, tx: Sender, timeout: Duration) -> (Id, ExitStatus) {
if !timeout.is_zero() {
warn!("task '{id}': waiting {timeout:?} to start");
tokio::time::sleep(timeout).await;
}
warn!("task '{id}': starting");
let rx = tx.subscribe();
let future = AssertUnwindSafe(task.start(id, tx, rx)).catch_unwind();
let result = match future.await {
Ok(Ok(_)) => ExitStatus::Success,
Ok(Err(e)) => ExitStatus::Error(e),
Err(e) => ExitStatus::Panic(e),
};
(id, result)
}
pub async fn run_tasks(tasks: HashMap<String, Box<dyn Task>>) {
let mut futures = FuturesUnordered::new();
let (tx, _) = tokio::sync::broadcast::channel(64);
for (id, task) in &tasks {
let id = Id::new(id.clone());
futures.push(start_task(id, task.as_ref(), tx.clone(), Duration::ZERO));
}
while let Some((id, result)) = futures.next().await {
let task = &tasks[id.0];
let dur = task.restart_timeout();
match &result {
ExitStatus::Success => warn!("task '{id}' exited without error"),
ExitStatus::Error(e) => warn!("task '{id}': exited with error: {e}"),
ExitStatus::Panic(_) => error!("task '{id}': panicked"),
}
if let Some(dur) = dur {
futures.push(start_task(id, task.as_ref(), tx.clone(), dur));
}
}
}