made changes
This commit is contained in:
parent
b0a8f8ce73
commit
1cc19478e3
17 changed files with 530 additions and 280 deletions
68
Cargo.lock
generated
68
Cargo.lock
generated
|
@ -329,7 +329,6 @@ dependencies = [
|
||||||
"js-sys",
|
"js-sys",
|
||||||
"num-integer",
|
"num-integer",
|
||||||
"num-traits",
|
"num-traits",
|
||||||
"serde",
|
|
||||||
"time 0.1.45",
|
"time 0.1.45",
|
||||||
"wasm-bindgen",
|
"wasm-bindgen",
|
||||||
"winapi",
|
"winapi",
|
||||||
|
@ -363,17 +362,6 @@ dependencies = [
|
||||||
"bitflags",
|
"bitflags",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "command_attr"
|
|
||||||
version = "0.4.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "4d999d4e7731150ee14aee8f619c7a9aa9a4385bca0606c4fa95aa2f36a05d9a"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2 1.0.56",
|
|
||||||
"quote 1.0.27",
|
|
||||||
"syn 1.0.109",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "const-oid"
|
name = "const-oid"
|
||||||
version = "0.7.1"
|
version = "0.7.1"
|
||||||
|
@ -1251,7 +1239,7 @@ dependencies = [
|
||||||
"tokio-native-tls",
|
"tokio-native-tls",
|
||||||
"tokio-stream",
|
"tokio-stream",
|
||||||
"tokio-util 0.6.10",
|
"tokio-util 0.6.10",
|
||||||
"toml",
|
"toml 0.5.11",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -1327,12 +1315,6 @@ version = "1.4.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
|
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "levenshtein"
|
|
||||||
version = "1.0.5"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "db13adb97ab515a3691f56e4dbab09283d0b86cb45abd991d8634a9d6f501760"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "libc"
|
name = "libc"
|
||||||
version = "0.2.144"
|
version = "0.2.144"
|
||||||
|
@ -1620,8 +1602,11 @@ dependencies = [
|
||||||
"irc",
|
"irc",
|
||||||
"log",
|
"log",
|
||||||
"matrix-sdk",
|
"matrix-sdk",
|
||||||
|
"serde",
|
||||||
|
"serde_derive",
|
||||||
"serenity",
|
"serenity",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"toml 0.7.4",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -2324,7 +2309,7 @@ dependencies = [
|
||||||
"ruma-identifiers-validation",
|
"ruma-identifiers-validation",
|
||||||
"serde",
|
"serde",
|
||||||
"syn 1.0.109",
|
"syn 1.0.109",
|
||||||
"toml",
|
"toml 0.5.11",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -2488,6 +2473,15 @@ dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serde_spanned"
|
||||||
|
version = "0.6.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "93107647184f6027e3b7dcb2e11034cf95ffa1e3a682c67951963ac69c1c007d"
|
||||||
|
dependencies = [
|
||||||
|
"serde",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde_urlencoded"
|
name = "serde_urlencoded"
|
||||||
version = "0.7.1"
|
version = "0.7.1"
|
||||||
|
@ -2512,12 +2506,9 @@ dependencies = [
|
||||||
"bitflags",
|
"bitflags",
|
||||||
"bytes",
|
"bytes",
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"chrono",
|
|
||||||
"command_attr",
|
|
||||||
"dashmap",
|
"dashmap",
|
||||||
"flate2",
|
"flate2",
|
||||||
"futures",
|
"futures",
|
||||||
"levenshtein",
|
|
||||||
"mime",
|
"mime",
|
||||||
"mime_guess",
|
"mime_guess",
|
||||||
"parking_lot 0.12.1",
|
"parking_lot 0.12.1",
|
||||||
|
@ -2526,13 +2517,11 @@ dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
"serde-value",
|
"serde-value",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"static_assertions",
|
|
||||||
"time 0.3.21",
|
"time 0.3.21",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tracing",
|
"tracing",
|
||||||
"typemap_rev",
|
"typemap_rev",
|
||||||
"url",
|
"url",
|
||||||
"uwl",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -2632,12 +2621,6 @@ dependencies = [
|
||||||
"der",
|
"der",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "static_assertions"
|
|
||||||
version = "1.1.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "strsim"
|
name = "strsim"
|
||||||
version = "0.10.0"
|
version = "0.10.0"
|
||||||
|
@ -2885,11 +2868,26 @@ dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "toml"
|
||||||
|
version = "0.7.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d6135d499e69981f9ff0ef2167955a5333c35e36f6937d382974566b3d5b94ec"
|
||||||
|
dependencies = [
|
||||||
|
"serde",
|
||||||
|
"serde_spanned",
|
||||||
|
"toml_datetime",
|
||||||
|
"toml_edit",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "toml_datetime"
|
name = "toml_datetime"
|
||||||
version = "0.6.2"
|
version = "0.6.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "5a76a9312f5ba4c2dec6b9161fdf25d87ad8a09256ccea5a556fef03c706a10f"
|
checksum = "5a76a9312f5ba4c2dec6b9161fdf25d87ad8a09256ccea5a556fef03c706a10f"
|
||||||
|
dependencies = [
|
||||||
|
"serde",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "toml_edit"
|
name = "toml_edit"
|
||||||
|
@ -2898,6 +2896,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "92d964908cec0d030b812013af25a0e57fddfadb1e066ecc6681d86253129d4f"
|
checksum = "92d964908cec0d030b812013af25a0e57fddfadb1e066ecc6681d86253129d4f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"indexmap",
|
"indexmap",
|
||||||
|
"serde",
|
||||||
|
"serde_spanned",
|
||||||
"toml_datetime",
|
"toml_datetime",
|
||||||
"winnow",
|
"winnow",
|
||||||
]
|
]
|
||||||
|
@ -3078,12 +3078,6 @@ dependencies = [
|
||||||
"wasm-bindgen",
|
"wasm-bindgen",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "uwl"
|
|
||||||
version = "0.6.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "f4bf03e0ca70d626ecc4ba6b0763b934b6f2976e8c744088bb3c1d646fbb1ad0"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "vcpkg"
|
name = "vcpkg"
|
||||||
version = "0.2.15"
|
version = "0.2.15"
|
||||||
|
|
29
Cargo.toml
29
Cargo.toml
|
@ -5,13 +5,32 @@ edition = "2021"
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[features]
|
||||||
|
irc = ["dep:irc"]
|
||||||
|
matrix = ["dep:matrix-sdk"]
|
||||||
|
discord = ["dep:serenity"]
|
||||||
|
default = ["irc", "matrix", "discord"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
env_logger = "0.10"
|
||||||
|
log = "0.4"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
tokio = { version = "1.28", features = ["rt-multi-thread", "macros", "time", "sync"] }
|
tokio = { version = "1.28", features = ["rt-multi-thread", "macros", "time", "sync"] }
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
env_logger = "0.10"
|
toml = "0.7"
|
||||||
log = "0.4"
|
serde = "1.0"
|
||||||
|
serde_derive = "1.0"
|
||||||
|
|
||||||
irc = "0.15"
|
[dependencies.irc]
|
||||||
matrix-sdk = "0.6"
|
version = "0.15"
|
||||||
serenity = "0.11"
|
optional = true
|
||||||
|
|
||||||
|
[dependencies.matrix-sdk]
|
||||||
|
version = "0.6"
|
||||||
|
optional = true
|
||||||
|
|
||||||
|
[dependencies.serenity]
|
||||||
|
version = "0.11"
|
||||||
|
optional = true
|
||||||
|
default-features = false
|
||||||
|
features = ["client", "cache", "gateway", "rustls_backend", "model"]
|
||||||
|
|
34
example-config.toml
Normal file
34
example-config.toml
Normal file
|
@ -0,0 +1,34 @@
|
||||||
|
#=-- example irc node --=#
|
||||||
|
[nodes.a]
|
||||||
|
platform = "irc"
|
||||||
|
|
||||||
|
[nodes.a.config]
|
||||||
|
server = "example.com"
|
||||||
|
port = 6667
|
||||||
|
nick = "example"
|
||||||
|
alt_nicks = []
|
||||||
|
tls = true
|
||||||
|
|
||||||
|
[nodes.a.links]
|
||||||
|
"#example" = "example-link"
|
||||||
|
|
||||||
|
#=-- example matrix node --=#
|
||||||
|
[nodes.b]
|
||||||
|
platform = "matrix"
|
||||||
|
|
||||||
|
[nodes.b.config]
|
||||||
|
user = "@example:example.com"
|
||||||
|
password = "hunter2"
|
||||||
|
|
||||||
|
[nodes.b.links]
|
||||||
|
"!abcdefghijklmnopqr:example.com" = "example-link"
|
||||||
|
|
||||||
|
#=-- example discord node --=#
|
||||||
|
[nodes.c]
|
||||||
|
platform = "discord"
|
||||||
|
|
||||||
|
[nodes.c.config]
|
||||||
|
token = "MTAyNTYzNDA0NDkxNjAxMTA4OQ.G7CojH.TMB1DrSuncWUOU7M4ELDyMpmEdCaziZTDhoWt4"
|
||||||
|
|
||||||
|
[nodes.c.links]
|
||||||
|
"123456789012345678" = "example-link"
|
79
src/bridge_discord/bridge.rs
Normal file
79
src/bridge_discord/bridge.rs
Normal file
|
@ -0,0 +1,79 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use serde::Deserialize;
|
||||||
|
use serenity::{prelude::*, model::prelude::*, http::Http};
|
||||||
|
|
||||||
|
use crate::{linkmap::Linkmap, supervisor::{Task, TaskResult, Id, Sender, Receiver}};
|
||||||
|
use crate::supervisor::Message as BMessage;
|
||||||
|
|
||||||
|
use super::DiscordConfig;
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct DiscordTask {
|
||||||
|
config: DiscordConfig,
|
||||||
|
links: Arc<Linkmap<ChannelId>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Task for DiscordTask {
|
||||||
|
async fn start(&self, id: Id, tx: Sender, mut rx: Receiver) -> TaskResult {
|
||||||
|
let handler = Handler {
|
||||||
|
links: self.links.clone(),
|
||||||
|
tx,
|
||||||
|
id,
|
||||||
|
};
|
||||||
|
let intents = GatewayIntents::GUILD_MESSAGES | GatewayIntents::MESSAGE_CONTENT;
|
||||||
|
let mut client = Client::builder(&self.config.token, intents)
|
||||||
|
.event_handler(handler)
|
||||||
|
.await?;
|
||||||
|
let http = client.cache_and_http.http.clone();
|
||||||
|
let bridge_handler = async move {
|
||||||
|
loop {
|
||||||
|
let msg = rx.recv().await?;
|
||||||
|
handle_bridge_msg(msg, id, &self.links, http.as_ref()).await?;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
tokio::select! {
|
||||||
|
result = bridge_handler => result,
|
||||||
|
result = client.start() => result.map_err(|e| e.into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_bridge_msg(msg: BMessage, id: Id, links: &Linkmap<ChannelId>, http: &Http) -> TaskResult {
|
||||||
|
let content = format!("<{}> {}", msg.author, msg.content);
|
||||||
|
for channel in links.get_channels(&msg.link) {
|
||||||
|
if msg.origin.0 == id && msg.origin.1 == channel.to_string() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
channel.say(http, &content).await?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Handler {
|
||||||
|
id: Id,
|
||||||
|
links: Arc<Linkmap<ChannelId>>,
|
||||||
|
tx: Sender,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl EventHandler for Handler {
|
||||||
|
async fn message(&self, ctx: Context, msg: Message) {
|
||||||
|
if msg.author.id == ctx.cache.current_user_id() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
let Some(link) = self.links.get_link(&msg.channel_id) else { return };
|
||||||
|
self.tx.send(BMessage {
|
||||||
|
origin: (self.id, msg.channel_id.to_string()),
|
||||||
|
link: link.clone(),
|
||||||
|
author: msg.author.nick_in(ctx.http, msg.guild_id.unwrap()).await.unwrap_or(msg.author.name),
|
||||||
|
content: msg.content,
|
||||||
|
}).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn ready(&self, _: Context, _: Ready) {
|
||||||
|
println!("ready");
|
||||||
|
}
|
||||||
|
}
|
6
src/bridge_discord/config.rs
Normal file
6
src/bridge_discord/config.rs
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
use serde::Deserialize;
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct DiscordConfig {
|
||||||
|
pub token: String,
|
||||||
|
}
|
5
src/bridge_discord/mod.rs
Normal file
5
src/bridge_discord/mod.rs
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
mod bridge;
|
||||||
|
mod config;
|
||||||
|
|
||||||
|
pub use bridge::DiscordTask;
|
||||||
|
pub use config::DiscordConfig;
|
73
src/bridge_irc/bridge.rs
Normal file
73
src/bridge_irc/bridge.rs
Normal file
|
@ -0,0 +1,73 @@
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use futures::StreamExt;
|
||||||
|
use irc::client::prelude::Config;
|
||||||
|
use serde::Deserialize;
|
||||||
|
use tokio::select;
|
||||||
|
use irc::{client::Client, proto::Command};
|
||||||
|
use crate::linkmap::Linkmap;
|
||||||
|
use crate::supervisor::Id;
|
||||||
|
use crate::{supervisor::{Message, Sender, Receiver, Task, TaskResult}};
|
||||||
|
use irc::proto::Message as IrcMessage;
|
||||||
|
|
||||||
|
use super::IrcConfig;
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct IrcTask {
|
||||||
|
config: IrcConfig,
|
||||||
|
links: Linkmap<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IrcTask {
|
||||||
|
async fn handle_bridge_msg(&self, id: Id, client: &mut Client, msg: Message) -> TaskResult {
|
||||||
|
for channel in self.links.get_channels(&msg.link) {
|
||||||
|
if msg.origin.0 == id && &msg.origin.1 == channel {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
client.send_privmsg(channel, format!("<{}> {}", msg.author, msg.content))?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_irc_msg(&self, id: Id, tx: &mut Sender, msg: IrcMessage) -> TaskResult {
|
||||||
|
if let Command::PRIVMSG(channel, message) = &msg.command {
|
||||||
|
let Some(link) = self.links.get_link(channel) else { return Ok(()) };
|
||||||
|
let Some(author) = msg.source_nickname() else { return Ok(()) };
|
||||||
|
tx.send(Message {
|
||||||
|
origin: (id, channel.clone()),
|
||||||
|
link: link.clone(),
|
||||||
|
author: author.to_owned(),
|
||||||
|
content: message.clone(),
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Task for IrcTask {
|
||||||
|
async fn start(&self, id: Id, mut tx: Sender, mut rx: Receiver) -> TaskResult {
|
||||||
|
let config = Config {
|
||||||
|
server: Some(self.config.server.clone()),
|
||||||
|
port: Some(self.config.port),
|
||||||
|
nickname: Some(self.config.nick.clone()),
|
||||||
|
alt_nicks: self.config.alt_nicks.clone(),
|
||||||
|
use_tls: Some(self.config.tls),
|
||||||
|
channels: self.links.iter_channels().cloned().collect(),
|
||||||
|
..Config::default()
|
||||||
|
};
|
||||||
|
let mut client = Client::from_config(config).await?;
|
||||||
|
client.identify()?;
|
||||||
|
|
||||||
|
let mut stream = client.stream()?;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
bridge_msg = rx.recv() => self.handle_bridge_msg(id, &mut client, bridge_msg?).await?,
|
||||||
|
irc_msg = stream.next() => match irc_msg {
|
||||||
|
Some(msg) => self.handle_irc_msg(id, &mut tx, msg?).await?,
|
||||||
|
None => return Err("Lost connection to IRC server".into()),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
10
src/bridge_irc/config.rs
Normal file
10
src/bridge_irc/config.rs
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
use serde::Deserialize;
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
|
pub struct IrcConfig {
|
||||||
|
pub server: String,
|
||||||
|
pub port: u16,
|
||||||
|
pub tls: bool,
|
||||||
|
pub nick: String,
|
||||||
|
pub alt_nicks: Vec<String>,
|
||||||
|
}
|
|
@ -1,67 +1,5 @@
|
||||||
use async_trait::async_trait;
|
mod config;
|
||||||
use futures::StreamExt;
|
mod bridge;
|
||||||
use tokio::select;
|
|
||||||
use irc::{client::Client, proto::Command};
|
|
||||||
use crate::linkmap::Linkmap;
|
|
||||||
use crate::message::Id;
|
|
||||||
use crate::{message::{Message, Sender, Receiver}, supervisor::{Task, TaskResult}};
|
|
||||||
use irc::proto::Message as IrcMessage;
|
|
||||||
|
|
||||||
pub use irc::client::prelude::Config as IrcConfig;
|
pub use config::IrcConfig;
|
||||||
|
pub use bridge::IrcTask;
|
||||||
pub struct IrcTask {
|
|
||||||
config: IrcConfig,
|
|
||||||
channels: Linkmap<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl IrcTask {
|
|
||||||
pub fn new(config: IrcConfig, channels: Linkmap<String>) -> Self {
|
|
||||||
Self {
|
|
||||||
config, channels
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_bridge_msg(&self, id: Id, client: &mut Client, msg: Message) -> TaskResult {
|
|
||||||
for channel in self.channels.get_channels(&msg.link) {
|
|
||||||
if msg.origin.0 == id && &msg.origin.1 == channel {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
client.send_privmsg(channel, format!("<{}> {}", msg.author, msg.content))?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_irc_msg(&self, id: Id, tx: &mut Sender, msg: IrcMessage) -> TaskResult {
|
|
||||||
if let Command::PRIVMSG(channel, message) = &msg.command {
|
|
||||||
let Some(link) = self.channels.get_link(channel) else { return Ok(()) };
|
|
||||||
let Some(author) = msg.source_nickname() else { return Ok(()) };
|
|
||||||
tx.send(Message {
|
|
||||||
origin: (id, channel.to_owned()),
|
|
||||||
link: link.clone(),
|
|
||||||
author: author.to_owned(),
|
|
||||||
content: message.clone(),
|
|
||||||
})?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl Task for IrcTask {
|
|
||||||
async fn start(&self, id: Id, mut tx: Sender, mut rx: Receiver) -> TaskResult {
|
|
||||||
let mut client = Client::from_config(self.config.clone()).await?;
|
|
||||||
client.identify()?;
|
|
||||||
|
|
||||||
let mut stream = client.stream()?;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
select! {
|
|
||||||
bridge_msg = rx.recv() => self.handle_bridge_msg(id, &mut client, bridge_msg?).await?,
|
|
||||||
irc_msg = stream.next() => match irc_msg {
|
|
||||||
Some(msg) => self.handle_irc_msg(id, &mut tx, msg?).await?,
|
|
||||||
None => return Err("Lost connection to IRC server".into()),
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
96
src/bridge_matrix/bridge.rs
Normal file
96
src/bridge_matrix/bridge.rs
Normal file
|
@ -0,0 +1,96 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use futures::StreamExt;
|
||||||
|
use matrix_sdk::Client;
|
||||||
|
use matrix_sdk::config::SyncSettings;
|
||||||
|
use matrix_sdk::event_handler::Ctx;
|
||||||
|
use matrix_sdk::room::Room;
|
||||||
|
use matrix_sdk::ruma::events::room::message::{SyncRoomMessageEvent, RoomMessageEventContent};
|
||||||
|
use matrix_sdk::ruma::RoomId;
|
||||||
|
use serde::Deserialize;
|
||||||
|
use tokio::select;
|
||||||
|
use crate::linkmap::Linkmap;
|
||||||
|
use crate::supervisor::Message;
|
||||||
|
use crate::supervisor::Id;
|
||||||
|
use crate::{supervisor::{Sender, Receiver, Task, TaskResult}};
|
||||||
|
|
||||||
|
use super::MatrixConfig;
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct MatrixTask {
|
||||||
|
config: MatrixConfig,
|
||||||
|
links: Arc<Linkmap<String>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Context {
|
||||||
|
id: Id,
|
||||||
|
tx: Sender,
|
||||||
|
rooms: Arc<Linkmap<String>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Task for MatrixTask {
|
||||||
|
async fn start(&self, id: Id, tx: Sender, mut rx: Receiver) -> TaskResult {
|
||||||
|
let client = Client::builder()
|
||||||
|
.server_name(self.config.user.server_name())
|
||||||
|
.build().await?;
|
||||||
|
client.login_username(&self.config.user, &self.config.password)
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
client.add_event_handler_context(Arc::new(Context {
|
||||||
|
id,
|
||||||
|
tx,
|
||||||
|
rooms: self.links.clone()
|
||||||
|
}));
|
||||||
|
client.add_event_handler(handle_matrix_msg);
|
||||||
|
|
||||||
|
let mut sync_stream = Box::pin(client.sync_stream(SyncSettings::default()).await);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
response = sync_stream.next() => match response {
|
||||||
|
Some(Ok(_)) => (),
|
||||||
|
Some(Err(e)) => return Err(e.into()),
|
||||||
|
None => return Err("Lost connection to Matrix server".into())
|
||||||
|
},
|
||||||
|
message = rx.recv() => match message {
|
||||||
|
Ok(msg) => handle_bridge_msg(id, &client, &self.links, msg).await?,
|
||||||
|
Err(e) => return Err(e.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_matrix_msg(ev: SyncRoomMessageEvent, client: Client, room: Room, ctx: Ctx<Arc<Context>>) -> TaskResult {
|
||||||
|
if Some(ev.sender()) == client.user_id() { return Ok(()) }
|
||||||
|
let SyncRoomMessageEvent::Original(ev) = ev else { return Ok(()) };
|
||||||
|
let Ok(Some(sender)) = room.get_member(&ev.sender).await else { return Ok(()) };
|
||||||
|
let Some(link) = ctx.rooms.get_link(room.room_id().as_str()) else { return Ok(()) };
|
||||||
|
let body = ev.content.body();
|
||||||
|
|
||||||
|
ctx.tx.send(Message {
|
||||||
|
origin: (ctx.id, room.room_id().to_string()),
|
||||||
|
link: link.clone(),
|
||||||
|
author: sender.name().to_owned(),
|
||||||
|
content: body.to_owned(),
|
||||||
|
})?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_bridge_msg(id: Id, client: &Client, links: &Linkmap<String>, msg: Message) -> TaskResult {
|
||||||
|
let content = RoomMessageEventContent::text_plain(
|
||||||
|
format!("<{}> {}", msg.author, msg.content)
|
||||||
|
);
|
||||||
|
for room in links.get_channels(&msg.link) {
|
||||||
|
if msg.origin.0 == id && &msg.origin.1 == room {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let Ok(room) = <&RoomId>::try_from(room.as_str()) else { continue };
|
||||||
|
let Some(Room::Joined(room)) = client.get_room(room) else { continue };
|
||||||
|
room.send(content.clone(), None).await?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
8
src/bridge_matrix/config.rs
Normal file
8
src/bridge_matrix/config.rs
Normal file
|
@ -0,0 +1,8 @@
|
||||||
|
use matrix_sdk::ruma::OwnedUserId;
|
||||||
|
use serde::Deserialize;
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct MatrixConfig {
|
||||||
|
pub user: OwnedUserId,
|
||||||
|
pub password: String,
|
||||||
|
}
|
|
@ -1,102 +1,5 @@
|
||||||
use std::sync::Arc;
|
mod bridge;
|
||||||
|
mod config;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
pub use bridge::MatrixTask;
|
||||||
use futures::StreamExt;
|
pub use config::MatrixConfig;
|
||||||
use matrix_sdk::Client;
|
|
||||||
use matrix_sdk::config::SyncSettings;
|
|
||||||
use matrix_sdk::event_handler::Ctx;
|
|
||||||
use matrix_sdk::room::Room;
|
|
||||||
use matrix_sdk::ruma::events::room::message::{SyncRoomMessageEvent, RoomMessageEventContent};
|
|
||||||
use matrix_sdk::ruma::{RoomId, OwnedUserId};
|
|
||||||
use tokio::select;
|
|
||||||
use crate::linkmap::Linkmap;
|
|
||||||
use crate::message::{Id, Message};
|
|
||||||
use crate::{message::{Sender, Receiver}, supervisor::{Task, TaskResult}};
|
|
||||||
|
|
||||||
pub struct MatrixTask {
|
|
||||||
rooms: Arc<Linkmap<String>>,
|
|
||||||
user: OwnedUserId,
|
|
||||||
passwd: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MatrixTask {
|
|
||||||
pub fn new(user: OwnedUserId, passwd: String, rooms: Linkmap<String>) -> Self {
|
|
||||||
Self {
|
|
||||||
user,
|
|
||||||
passwd,
|
|
||||||
rooms: Arc::new(rooms)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Context {
|
|
||||||
id: Id,
|
|
||||||
tx: Sender,
|
|
||||||
rooms: Arc<Linkmap<String>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl Task for MatrixTask {
|
|
||||||
async fn start(&self, id: Id, tx: Sender, mut rx: Receiver) -> TaskResult {
|
|
||||||
let client = Client::builder()
|
|
||||||
.server_name(&self.user.server_name())
|
|
||||||
.build().await?;
|
|
||||||
client.login_username(&self.user, &self.passwd)
|
|
||||||
.send()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
client.add_event_handler_context(Arc::new(Context {
|
|
||||||
id,
|
|
||||||
tx,
|
|
||||||
rooms: self.rooms.clone()
|
|
||||||
}));
|
|
||||||
client.add_event_handler(handle_matrix_msg);
|
|
||||||
|
|
||||||
let mut sync_stream = Box::pin(client.sync_stream(SyncSettings::default()).await);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
select! {
|
|
||||||
response = sync_stream.next() => match response {
|
|
||||||
Some(Ok(_)) => (),
|
|
||||||
Some(Err(e)) => return Err(e.into()),
|
|
||||||
None => return Err("Lost connection to Matrix server".into())
|
|
||||||
},
|
|
||||||
message = rx.recv() => match message {
|
|
||||||
Ok(msg) => handle_bridge_msg(id, &client, &self.rooms, msg).await?,
|
|
||||||
Err(e) => return Err(e.into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_matrix_msg(ev: SyncRoomMessageEvent, client: Client, room: Room, ctx: Ctx<Arc<Context>>) -> TaskResult {
|
|
||||||
if Some(ev.sender()) == client.user_id() { return Ok(()) }
|
|
||||||
let SyncRoomMessageEvent::Original(ev) = ev else { return Ok(()) };
|
|
||||||
let Ok(Some(sender)) = room.get_member(&ev.sender).await else { return Ok(()) };
|
|
||||||
let Some(link) = ctx.rooms.get_link(room.room_id().as_str()) else { return Ok(()) };
|
|
||||||
let body = ev.content.body();
|
|
||||||
|
|
||||||
ctx.tx.send(Message {
|
|
||||||
origin: (ctx.id, room.room_id().to_string()),
|
|
||||||
link: link.clone(),
|
|
||||||
author: sender.name().to_owned(),
|
|
||||||
content: body.to_owned(),
|
|
||||||
})?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_bridge_msg(id: Id, client: &Client, links: &Linkmap<String>, msg: Message) -> TaskResult {
|
|
||||||
let content = RoomMessageEventContent::text_plain(
|
|
||||||
format!("<{}> {}", msg.author, msg.content)
|
|
||||||
);
|
|
||||||
for room in links.get_channels(&msg.link) {
|
|
||||||
if msg.origin.0 == id && &msg.origin.1 == room {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let Ok(room) = <&RoomId>::try_from(room.as_str()) else { continue };
|
|
||||||
let Some(Room::Joined(room)) = client.get_room(room) else { continue };
|
|
||||||
room.send(content.clone(), None).await?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
36
src/config.rs
Normal file
36
src/config.rs
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use serde_derive::Deserialize;
|
||||||
|
|
||||||
|
use crate::supervisor::Task;
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
#[serde(tag="platform")]
|
||||||
|
#[serde(rename_all="snake_case")]
|
||||||
|
pub enum Node {
|
||||||
|
#[cfg(feature="irc")]
|
||||||
|
Irc(crate::bridge_irc::IrcTask),
|
||||||
|
#[cfg(feature="matrix")]
|
||||||
|
Matrix(crate::bridge_matrix::MatrixTask),
|
||||||
|
#[cfg(feature="discord")]
|
||||||
|
Discord(crate::bridge_discord::DiscordTask),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Node {
|
||||||
|
pub fn into_task(self) -> Box<dyn Task> {
|
||||||
|
match self {
|
||||||
|
#[cfg(feature="irc")]
|
||||||
|
Node::Irc(t) => Box::new(t),
|
||||||
|
#[cfg(feature="matrix")]
|
||||||
|
Node::Matrix(t) => Box::new(t),
|
||||||
|
#[cfg(feature="discord")]
|
||||||
|
Node::Discord(t) => Box::new(t),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct Config {
|
||||||
|
pub nodes: HashMap<String, Node>,
|
||||||
|
}
|
|
@ -1,21 +1,17 @@
|
||||||
use std::{collections::HashMap, borrow::Borrow};
|
use std::{collections::HashMap, borrow::Borrow, hash::Hash, marker::PhantomData};
|
||||||
|
|
||||||
use crate::message::Link;
|
use serde::{de::Visitor, Deserialize, Deserializer};
|
||||||
use std::hash::Hash;
|
|
||||||
|
|
||||||
pub struct Linkmap<T: Hash + Eq + Clone> {
|
use crate::supervisor::Link;
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Linkmap<T: Hash + Eq> {
|
||||||
to: HashMap<T, Link>,
|
to: HashMap<T, Link>,
|
||||||
from: HashMap<Link, Vec<T>>,
|
from: HashMap<Link, Vec<T>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl <T: Hash + Eq + Clone> Linkmap<T> {
|
impl <T: Hash + Eq + Clone> Linkmap<T> {
|
||||||
pub fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
to: HashMap::new(),
|
|
||||||
from: HashMap::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn with_capacity(capacity: usize) -> Self {
|
pub fn with_capacity(capacity: usize) -> Self {
|
||||||
Self {
|
Self {
|
||||||
to: HashMap::with_capacity(capacity),
|
to: HashMap::with_capacity(capacity),
|
||||||
|
@ -29,14 +25,18 @@ impl <T: Hash + Eq + Clone> Linkmap<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_channels(&self, link: &Link) -> &[T] {
|
pub fn get_channels(&self, link: &Link) -> &[T] {
|
||||||
self.from.get(link).map(|a| a.as_slice()).unwrap_or(&[])
|
self.from.get(link).map_or(&[], Vec::as_slice)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn insert(&mut self, link: Link, channel: T) {
|
pub fn insert(&mut self, link: Link, channel: T) {
|
||||||
self.to.insert(channel.clone(), link.clone());
|
self.to.insert(channel.clone(), link.clone());
|
||||||
self.from.entry(link)
|
self.from.entry(link)
|
||||||
.or_insert(Vec::with_capacity(1))
|
.or_insert(Vec::with_capacity(1))
|
||||||
.push(channel.clone());
|
.push(channel);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn iter_channels(&self) -> impl Iterator<Item=&T> {
|
||||||
|
self.to.keys()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,3 +52,34 @@ impl <T: Hash + Eq + Clone> FromIterator<(T, Link)> for Linkmap<T> {
|
||||||
Self { to, from }
|
Self { to, from }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct LinkmapVisitor<T> {
|
||||||
|
_phantom: PhantomData<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl <'de, T> Visitor<'de> for LinkmapVisitor<T>
|
||||||
|
where T: Eq + Hash + Clone + Deserialize<'de> {
|
||||||
|
type Value = Linkmap<T>;
|
||||||
|
|
||||||
|
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||||
|
write!(formatter, "a linkmap")?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn visit_map<A>(self, mut access: A) -> Result<Self::Value, A::Error>
|
||||||
|
where A: serde::de::MapAccess<'de>, {
|
||||||
|
let mut map = Linkmap::with_capacity(access.size_hint().unwrap_or(0));
|
||||||
|
while let Some((channel, link)) = access.next_entry()? {
|
||||||
|
map.insert(link, channel);
|
||||||
|
}
|
||||||
|
Ok(map)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl <'de, T> Deserialize<'de> for Linkmap<T>
|
||||||
|
where T: Eq + Hash + Clone + Deserialize<'de> {
|
||||||
|
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||||
|
where D: Deserializer<'de> {
|
||||||
|
deserializer.deserialize_map(LinkmapVisitor { _phantom: PhantomData })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
29
src/main.rs
29
src/main.rs
|
@ -1,24 +1,35 @@
|
||||||
use bridge_matrix::MatrixTask;
|
use config::Node;
|
||||||
use linkmap::Linkmap;
|
|
||||||
use matrix_sdk::ruma::user_id;
|
|
||||||
use supervisor::Task;
|
|
||||||
use bridge_irc::{IrcConfig, IrcTask};
|
|
||||||
|
|
||||||
|
use crate::config::Config;
|
||||||
use crate::supervisor::run_tasks;
|
use crate::supervisor::run_tasks;
|
||||||
|
|
||||||
mod supervisor;
|
mod supervisor;
|
||||||
mod message;
|
|
||||||
mod linkmap;
|
mod linkmap;
|
||||||
|
mod config;
|
||||||
|
|
||||||
|
#[cfg(feature="irc")]
|
||||||
mod bridge_irc;
|
mod bridge_irc;
|
||||||
|
#[cfg(feature="matrix")]
|
||||||
mod bridge_matrix;
|
mod bridge_matrix;
|
||||||
|
#[cfg(feature="discord")]
|
||||||
|
mod bridge_discord;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
env_logger::init();
|
env_logger::init();
|
||||||
|
|
||||||
let tasks: Vec<Box<dyn Task>> = vec![
|
let conf_str = std::fs::read_to_string("bridge.toml").unwrap();
|
||||||
// tasks withheld for privacy
|
let config: Config = match toml::from_str(&conf_str) {
|
||||||
];
|
Ok(c) => c,
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("Error reading config: {e}");
|
||||||
|
return
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let tasks = config.nodes.into_iter()
|
||||||
|
.map(|(k, t)| (k, Node::into_task(t)))
|
||||||
|
.collect();
|
||||||
|
|
||||||
run_tasks(tasks).await;
|
run_tasks(tasks).await;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,30 +0,0 @@
|
||||||
pub type Sender = tokio::sync::broadcast::Sender<Message>;
|
|
||||||
pub type Receiver = tokio::sync::broadcast::Receiver<Message>;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
|
|
||||||
pub struct Link(String);
|
|
||||||
|
|
||||||
impl <S> From<S> for Link
|
|
||||||
where S: Into<String> {
|
|
||||||
fn from(value: S) -> Self {
|
|
||||||
Self(value.into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
|
||||||
pub struct Id(usize);
|
|
||||||
|
|
||||||
impl Id {
|
|
||||||
pub const fn new(n: usize) -> Self {
|
|
||||||
Self(n)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub struct Message {
|
|
||||||
pub origin: (Id, String),
|
|
||||||
pub link: Link,
|
|
||||||
pub author: String,
|
|
||||||
pub content: String,
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,14 +1,46 @@
|
||||||
use std::{any::Any, time::Duration};
|
use std::{fmt, error::Error, time::Duration, any::Any, panic::AssertUnwindSafe, collections::HashMap};
|
||||||
use std::error::Error;
|
|
||||||
use std::panic::AssertUnwindSafe;
|
|
||||||
use futures::FutureExt;
|
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use futures::StreamExt;
|
use futures::{stream::FuturesUnordered, StreamExt, FutureExt};
|
||||||
use futures::stream::FuturesUnordered;
|
use log::{warn, error};
|
||||||
use log::{warn, info, error};
|
use serde_derive::Deserialize;
|
||||||
|
|
||||||
use crate::message::{Sender, Receiver, Id};
|
|
||||||
|
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||||
|
pub struct Id(&'static str);
|
||||||
|
|
||||||
|
impl Id {
|
||||||
|
pub fn new(s: String) -> Self {
|
||||||
|
Self(Box::leak(s.into_boxed_str()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for Id {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
f.write_str(self.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Eq, PartialEq, Hash, Deserialize)]
|
||||||
|
pub struct Link(String);
|
||||||
|
|
||||||
|
impl <S> From<S> for Link
|
||||||
|
where S: Into<String> {
|
||||||
|
fn from(value: S) -> Self {
|
||||||
|
Self(value.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct Message {
|
||||||
|
pub origin: (Id, String),
|
||||||
|
pub link: Link,
|
||||||
|
pub author: String,
|
||||||
|
pub content: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type Sender = tokio::sync::broadcast::Sender<Message>;
|
||||||
|
pub type Receiver = tokio::sync::broadcast::Receiver<Message>;
|
||||||
|
|
||||||
pub type TaskResult = Result<(), Box<dyn Error>>;
|
pub type TaskResult = Result<(), Box<dyn Error>>;
|
||||||
|
|
||||||
|
@ -26,10 +58,14 @@ enum ExitStatus {
|
||||||
Panic(Box<dyn Any + 'static>),
|
Panic(Box<dyn Any + 'static>),
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn start_task(id: usize, task: &dyn Task, tx: Sender, timeout: Duration) -> (usize, ExitStatus) {
|
async fn start_task(id: Id, task: &dyn Task, tx: Sender, timeout: Duration) -> (Id, ExitStatus) {
|
||||||
|
if !timeout.is_zero() {
|
||||||
|
warn!("task '{id}': waiting {timeout:?} to start");
|
||||||
tokio::time::sleep(timeout).await;
|
tokio::time::sleep(timeout).await;
|
||||||
|
}
|
||||||
|
warn!("task '{id}': starting");
|
||||||
let rx = tx.subscribe();
|
let rx = tx.subscribe();
|
||||||
let future = AssertUnwindSafe(task.start(Id::new(id), tx, rx)).catch_unwind();
|
let future = AssertUnwindSafe(task.start(id, tx, rx)).catch_unwind();
|
||||||
let result = match future.await {
|
let result = match future.await {
|
||||||
Ok(Ok(_)) => ExitStatus::Success,
|
Ok(Ok(_)) => ExitStatus::Success,
|
||||||
Ok(Err(e)) => ExitStatus::Error(e),
|
Ok(Err(e)) => ExitStatus::Error(e),
|
||||||
|
@ -38,21 +74,22 @@ async fn start_task(id: usize, task: &dyn Task, tx: Sender, timeout: Duration) -
|
||||||
(id, result)
|
(id, result)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run_tasks(tasks: Vec<Box<dyn Task>>) {
|
pub async fn run_tasks(tasks: HashMap<String, Box<dyn Task>>) {
|
||||||
let mut futures = FuturesUnordered::new();
|
let mut futures = FuturesUnordered::new();
|
||||||
let (tx, _) = tokio::sync::broadcast::channel(64);
|
let (tx, _) = tokio::sync::broadcast::channel(64);
|
||||||
for (id, task) in tasks.iter().enumerate() {
|
for (id, task) in &tasks {
|
||||||
|
let id = Id::new(id.clone());
|
||||||
futures.push(start_task(id, task.as_ref(), tx.clone(), Duration::ZERO));
|
futures.push(start_task(id, task.as_ref(), tx.clone(), Duration::ZERO));
|
||||||
}
|
}
|
||||||
while let Some((id, result)) = futures.next().await {
|
while let Some((id, result)) = futures.next().await {
|
||||||
let task = &tasks[id];
|
let task = &tasks[id.0];
|
||||||
|
let dur = task.restart_timeout();
|
||||||
match &result {
|
match &result {
|
||||||
ExitStatus::Success => warn!("task {id:?} exited successfully"),
|
ExitStatus::Success => warn!("task '{id}' exited without error"),
|
||||||
ExitStatus::Error(e) => warn!("task {id:?}: exited with error: {e}"),
|
ExitStatus::Error(e) => warn!("task '{id}': exited with error: {e}"),
|
||||||
ExitStatus::Panic(_) => error!("task {id:?}: panicked"),
|
ExitStatus::Panic(_) => error!("task '{id}': panicked"),
|
||||||
}
|
}
|
||||||
if let Some(dur) = task.restart_timeout() {
|
if let Some(dur) = dur {
|
||||||
info!("task {id:?}: retrying in {dur:?}");
|
|
||||||
futures.push(start_task(id, task.as_ref(), tx.clone(), dur));
|
futures.push(start_task(id, task.as_ref(), tx.clone(), dur));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue