From 1cc19478e38cc05653724bde4a427267f0d8446a Mon Sep 17 00:00:00 2001 From: TriMill Date: Wed, 24 May 2023 00:23:36 -0400 Subject: [PATCH] made changes --- Cargo.lock | 68 +++++++++++------------ Cargo.toml | 29 ++++++++-- example-config.toml | 34 ++++++++++++ src/bridge_discord/bridge.rs | 79 ++++++++++++++++++++++++++ src/bridge_discord/config.rs | 6 ++ src/bridge_discord/mod.rs | 5 ++ src/bridge_irc/bridge.rs | 73 ++++++++++++++++++++++++ src/bridge_irc/config.rs | 10 ++++ src/bridge_irc/mod.rs | 70 ++--------------------- src/bridge_matrix/bridge.rs | 96 ++++++++++++++++++++++++++++++++ src/bridge_matrix/config.rs | 8 +++ src/bridge_matrix/mod.rs | 105 ++--------------------------------- src/config.rs | 36 ++++++++++++ src/linkmap.rs | 57 ++++++++++++++----- src/main.rs | 29 +++++++--- src/message.rs | 30 ---------- src/supervisor.rs | 75 ++++++++++++++++++------- 17 files changed, 530 insertions(+), 280 deletions(-) create mode 100644 example-config.toml create mode 100644 src/bridge_discord/bridge.rs create mode 100644 src/bridge_discord/config.rs create mode 100644 src/bridge_discord/mod.rs create mode 100644 src/bridge_irc/bridge.rs create mode 100644 src/bridge_irc/config.rs create mode 100644 src/bridge_matrix/bridge.rs create mode 100644 src/bridge_matrix/config.rs create mode 100644 src/config.rs delete mode 100644 src/message.rs diff --git a/Cargo.lock b/Cargo.lock index b6dba38..85f6da1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -329,7 +329,6 @@ dependencies = [ "js-sys", "num-integer", "num-traits", - "serde", "time 0.1.45", "wasm-bindgen", "winapi", @@ -363,17 +362,6 @@ dependencies = [ "bitflags", ] -[[package]] -name = "command_attr" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d999d4e7731150ee14aee8f619c7a9aa9a4385bca0606c4fa95aa2f36a05d9a" -dependencies = [ - "proc-macro2 1.0.56", - "quote 1.0.27", - "syn 1.0.109", -] - [[package]] name = "const-oid" version = "0.7.1" @@ -1251,7 +1239,7 @@ dependencies = [ "tokio-native-tls", "tokio-stream", "tokio-util 0.6.10", - "toml", + "toml 0.5.11", ] [[package]] @@ -1327,12 +1315,6 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" -[[package]] -name = "levenshtein" -version = "1.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db13adb97ab515a3691f56e4dbab09283d0b86cb45abd991d8634a9d6f501760" - [[package]] name = "libc" version = "0.2.144" @@ -1620,8 +1602,11 @@ dependencies = [ "irc", "log", "matrix-sdk", + "serde", + "serde_derive", "serenity", "tokio", + "toml 0.7.4", ] [[package]] @@ -2324,7 +2309,7 @@ dependencies = [ "ruma-identifiers-validation", "serde", "syn 1.0.109", - "toml", + "toml 0.5.11", ] [[package]] @@ -2488,6 +2473,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93107647184f6027e3b7dcb2e11034cf95ffa1e3a682c67951963ac69c1c007d" +dependencies = [ + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -2512,12 +2506,9 @@ dependencies = [ "bitflags", "bytes", "cfg-if", - "chrono", - "command_attr", "dashmap", "flate2", "futures", - "levenshtein", "mime", "mime_guess", "parking_lot 0.12.1", @@ -2526,13 +2517,11 @@ dependencies = [ "serde", "serde-value", "serde_json", - "static_assertions", "time 0.3.21", "tokio", "tracing", "typemap_rev", "url", - "uwl", ] [[package]] @@ -2632,12 +2621,6 @@ dependencies = [ "der", ] -[[package]] -name = "static_assertions" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" - [[package]] name = "strsim" version = "0.10.0" @@ -2885,11 +2868,26 @@ dependencies = [ "serde", ] +[[package]] +name = "toml" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6135d499e69981f9ff0ef2167955a5333c35e36f6937d382974566b3d5b94ec" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + [[package]] name = "toml_datetime" version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a76a9312f5ba4c2dec6b9161fdf25d87ad8a09256ccea5a556fef03c706a10f" +dependencies = [ + "serde", +] [[package]] name = "toml_edit" @@ -2898,6 +2896,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92d964908cec0d030b812013af25a0e57fddfadb1e066ecc6681d86253129d4f" dependencies = [ "indexmap", + "serde", + "serde_spanned", "toml_datetime", "winnow", ] @@ -3078,12 +3078,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "uwl" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4bf03e0ca70d626ecc4ba6b0763b934b6f2976e8c744088bb3c1d646fbb1ad0" - [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index 3f5d808..cadf5ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,13 +5,32 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +irc = ["dep:irc"] +matrix = ["dep:matrix-sdk"] +discord = ["dep:serenity"] +default = ["irc", "matrix", "discord"] + [dependencies] +env_logger = "0.10" +log = "0.4" futures = "0.3" tokio = { version = "1.28", features = ["rt-multi-thread", "macros", "time", "sync"] } async-trait = "0.1" -env_logger = "0.10" -log = "0.4" +toml = "0.7" +serde = "1.0" +serde_derive = "1.0" -irc = "0.15" -matrix-sdk = "0.6" -serenity = "0.11" +[dependencies.irc] +version = "0.15" +optional = true + +[dependencies.matrix-sdk] +version = "0.6" +optional = true + +[dependencies.serenity] +version = "0.11" +optional = true +default-features = false +features = ["client", "cache", "gateway", "rustls_backend", "model"] diff --git a/example-config.toml b/example-config.toml new file mode 100644 index 0000000..d9819e5 --- /dev/null +++ b/example-config.toml @@ -0,0 +1,34 @@ +#=-- example irc node --=# +[nodes.a] +platform = "irc" + +[nodes.a.config] +server = "example.com" +port = 6667 +nick = "example" +alt_nicks = [] +tls = true + +[nodes.a.links] +"#example" = "example-link" + +#=-- example matrix node --=# +[nodes.b] +platform = "matrix" + +[nodes.b.config] +user = "@example:example.com" +password = "hunter2" + +[nodes.b.links] +"!abcdefghijklmnopqr:example.com" = "example-link" + +#=-- example discord node --=# +[nodes.c] +platform = "discord" + +[nodes.c.config] +token = "MTAyNTYzNDA0NDkxNjAxMTA4OQ.G7CojH.TMB1DrSuncWUOU7M4ELDyMpmEdCaziZTDhoWt4" + +[nodes.c.links] +"123456789012345678" = "example-link" diff --git a/src/bridge_discord/bridge.rs b/src/bridge_discord/bridge.rs new file mode 100644 index 0000000..e8994e0 --- /dev/null +++ b/src/bridge_discord/bridge.rs @@ -0,0 +1,79 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use serde::Deserialize; +use serenity::{prelude::*, model::prelude::*, http::Http}; + +use crate::{linkmap::Linkmap, supervisor::{Task, TaskResult, Id, Sender, Receiver}}; +use crate::supervisor::Message as BMessage; + +use super::DiscordConfig; + +#[derive(Debug, Deserialize)] +pub struct DiscordTask { + config: DiscordConfig, + links: Arc>, +} + +#[async_trait] +impl Task for DiscordTask { + async fn start(&self, id: Id, tx: Sender, mut rx: Receiver) -> TaskResult { + let handler = Handler { + links: self.links.clone(), + tx, + id, + }; + let intents = GatewayIntents::GUILD_MESSAGES | GatewayIntents::MESSAGE_CONTENT; + let mut client = Client::builder(&self.config.token, intents) + .event_handler(handler) + .await?; + let http = client.cache_and_http.http.clone(); + let bridge_handler = async move { + loop { + let msg = rx.recv().await?; + handle_bridge_msg(msg, id, &self.links, http.as_ref()).await?; + } + }; + tokio::select! { + result = bridge_handler => result, + result = client.start() => result.map_err(|e| e.into()), + } + } +} + +async fn handle_bridge_msg(msg: BMessage, id: Id, links: &Linkmap, http: &Http) -> TaskResult { + let content = format!("<{}> {}", msg.author, msg.content); + for channel in links.get_channels(&msg.link) { + if msg.origin.0 == id && msg.origin.1 == channel.to_string() { + continue + } + channel.say(http, &content).await?; + } + Ok(()) +} + +struct Handler { + id: Id, + links: Arc>, + tx: Sender, +} + +#[async_trait] +impl EventHandler for Handler { + async fn message(&self, ctx: Context, msg: Message) { + if msg.author.id == ctx.cache.current_user_id() { + return + } + let Some(link) = self.links.get_link(&msg.channel_id) else { return }; + self.tx.send(BMessage { + origin: (self.id, msg.channel_id.to_string()), + link: link.clone(), + author: msg.author.nick_in(ctx.http, msg.guild_id.unwrap()).await.unwrap_or(msg.author.name), + content: msg.content, + }).unwrap(); + } + + async fn ready(&self, _: Context, _: Ready) { + println!("ready"); + } +} diff --git a/src/bridge_discord/config.rs b/src/bridge_discord/config.rs new file mode 100644 index 0000000..d0c55a6 --- /dev/null +++ b/src/bridge_discord/config.rs @@ -0,0 +1,6 @@ +use serde::Deserialize; + +#[derive(Debug, Deserialize)] +pub struct DiscordConfig { + pub token: String, +} diff --git a/src/bridge_discord/mod.rs b/src/bridge_discord/mod.rs new file mode 100644 index 0000000..4f9a5cc --- /dev/null +++ b/src/bridge_discord/mod.rs @@ -0,0 +1,5 @@ +mod bridge; +mod config; + +pub use bridge::DiscordTask; +pub use config::DiscordConfig; diff --git a/src/bridge_irc/bridge.rs b/src/bridge_irc/bridge.rs new file mode 100644 index 0000000..eee7419 --- /dev/null +++ b/src/bridge_irc/bridge.rs @@ -0,0 +1,73 @@ +use async_trait::async_trait; +use futures::StreamExt; +use irc::client::prelude::Config; +use serde::Deserialize; +use tokio::select; +use irc::{client::Client, proto::Command}; +use crate::linkmap::Linkmap; +use crate::supervisor::Id; +use crate::{supervisor::{Message, Sender, Receiver, Task, TaskResult}}; +use irc::proto::Message as IrcMessage; + +use super::IrcConfig; + +#[derive(Debug, Deserialize)] +pub struct IrcTask { + config: IrcConfig, + links: Linkmap, +} + +impl IrcTask { + async fn handle_bridge_msg(&self, id: Id, client: &mut Client, msg: Message) -> TaskResult { + for channel in self.links.get_channels(&msg.link) { + if msg.origin.0 == id && &msg.origin.1 == channel { + continue + } + client.send_privmsg(channel, format!("<{}> {}", msg.author, msg.content))?; + } + Ok(()) + } + + async fn handle_irc_msg(&self, id: Id, tx: &mut Sender, msg: IrcMessage) -> TaskResult { + if let Command::PRIVMSG(channel, message) = &msg.command { + let Some(link) = self.links.get_link(channel) else { return Ok(()) }; + let Some(author) = msg.source_nickname() else { return Ok(()) }; + tx.send(Message { + origin: (id, channel.clone()), + link: link.clone(), + author: author.to_owned(), + content: message.clone(), + })?; + } + Ok(()) + } +} + +#[async_trait] +impl Task for IrcTask { + async fn start(&self, id: Id, mut tx: Sender, mut rx: Receiver) -> TaskResult { + let config = Config { + server: Some(self.config.server.clone()), + port: Some(self.config.port), + nickname: Some(self.config.nick.clone()), + alt_nicks: self.config.alt_nicks.clone(), + use_tls: Some(self.config.tls), + channels: self.links.iter_channels().cloned().collect(), + ..Config::default() + }; + let mut client = Client::from_config(config).await?; + client.identify()?; + + let mut stream = client.stream()?; + + loop { + select! { + bridge_msg = rx.recv() => self.handle_bridge_msg(id, &mut client, bridge_msg?).await?, + irc_msg = stream.next() => match irc_msg { + Some(msg) => self.handle_irc_msg(id, &mut tx, msg?).await?, + None => return Err("Lost connection to IRC server".into()), + } + }; + } + } +} diff --git a/src/bridge_irc/config.rs b/src/bridge_irc/config.rs new file mode 100644 index 0000000..2019c89 --- /dev/null +++ b/src/bridge_irc/config.rs @@ -0,0 +1,10 @@ +use serde::Deserialize; + +#[derive(Clone, Debug, Deserialize)] +pub struct IrcConfig { + pub server: String, + pub port: u16, + pub tls: bool, + pub nick: String, + pub alt_nicks: Vec, +} diff --git a/src/bridge_irc/mod.rs b/src/bridge_irc/mod.rs index 2f407ec..6055442 100644 --- a/src/bridge_irc/mod.rs +++ b/src/bridge_irc/mod.rs @@ -1,67 +1,5 @@ -use async_trait::async_trait; -use futures::StreamExt; -use tokio::select; -use irc::{client::Client, proto::Command}; -use crate::linkmap::Linkmap; -use crate::message::Id; -use crate::{message::{Message, Sender, Receiver}, supervisor::{Task, TaskResult}}; -use irc::proto::Message as IrcMessage; +mod config; +mod bridge; -pub use irc::client::prelude::Config as IrcConfig; - -pub struct IrcTask { - config: IrcConfig, - channels: Linkmap, -} - -impl IrcTask { - pub fn new(config: IrcConfig, channels: Linkmap) -> Self { - Self { - config, channels - } - } - - async fn handle_bridge_msg(&self, id: Id, client: &mut Client, msg: Message) -> TaskResult { - for channel in self.channels.get_channels(&msg.link) { - if msg.origin.0 == id && &msg.origin.1 == channel { - continue - } - client.send_privmsg(channel, format!("<{}> {}", msg.author, msg.content))?; - } - Ok(()) - } - - async fn handle_irc_msg(&self, id: Id, tx: &mut Sender, msg: IrcMessage) -> TaskResult { - if let Command::PRIVMSG(channel, message) = &msg.command { - let Some(link) = self.channels.get_link(channel) else { return Ok(()) }; - let Some(author) = msg.source_nickname() else { return Ok(()) }; - tx.send(Message { - origin: (id, channel.to_owned()), - link: link.clone(), - author: author.to_owned(), - content: message.clone(), - })?; - } - Ok(()) - } -} - -#[async_trait] -impl Task for IrcTask { - async fn start(&self, id: Id, mut tx: Sender, mut rx: Receiver) -> TaskResult { - let mut client = Client::from_config(self.config.clone()).await?; - client.identify()?; - - let mut stream = client.stream()?; - - loop { - select! { - bridge_msg = rx.recv() => self.handle_bridge_msg(id, &mut client, bridge_msg?).await?, - irc_msg = stream.next() => match irc_msg { - Some(msg) => self.handle_irc_msg(id, &mut tx, msg?).await?, - None => return Err("Lost connection to IRC server".into()), - } - }; - } - } -} +pub use config::IrcConfig; +pub use bridge::IrcTask; diff --git a/src/bridge_matrix/bridge.rs b/src/bridge_matrix/bridge.rs new file mode 100644 index 0000000..5746e15 --- /dev/null +++ b/src/bridge_matrix/bridge.rs @@ -0,0 +1,96 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use futures::StreamExt; +use matrix_sdk::Client; +use matrix_sdk::config::SyncSettings; +use matrix_sdk::event_handler::Ctx; +use matrix_sdk::room::Room; +use matrix_sdk::ruma::events::room::message::{SyncRoomMessageEvent, RoomMessageEventContent}; +use matrix_sdk::ruma::RoomId; +use serde::Deserialize; +use tokio::select; +use crate::linkmap::Linkmap; +use crate::supervisor::Message; +use crate::supervisor::Id; +use crate::{supervisor::{Sender, Receiver, Task, TaskResult}}; + +use super::MatrixConfig; + +#[derive(Debug, Deserialize)] +pub struct MatrixTask { + config: MatrixConfig, + links: Arc>, +} + +struct Context { + id: Id, + tx: Sender, + rooms: Arc>, +} + +#[async_trait] +impl Task for MatrixTask { + async fn start(&self, id: Id, tx: Sender, mut rx: Receiver) -> TaskResult { + let client = Client::builder() + .server_name(self.config.user.server_name()) + .build().await?; + client.login_username(&self.config.user, &self.config.password) + .send() + .await?; + + client.add_event_handler_context(Arc::new(Context { + id, + tx, + rooms: self.links.clone() + })); + client.add_event_handler(handle_matrix_msg); + + let mut sync_stream = Box::pin(client.sync_stream(SyncSettings::default()).await); + + loop { + select! { + response = sync_stream.next() => match response { + Some(Ok(_)) => (), + Some(Err(e)) => return Err(e.into()), + None => return Err("Lost connection to Matrix server".into()) + }, + message = rx.recv() => match message { + Ok(msg) => handle_bridge_msg(id, &client, &self.links, msg).await?, + Err(e) => return Err(e.into()) + } + } + } + } +} + +async fn handle_matrix_msg(ev: SyncRoomMessageEvent, client: Client, room: Room, ctx: Ctx>) -> TaskResult { + if Some(ev.sender()) == client.user_id() { return Ok(()) } + let SyncRoomMessageEvent::Original(ev) = ev else { return Ok(()) }; + let Ok(Some(sender)) = room.get_member(&ev.sender).await else { return Ok(()) }; + let Some(link) = ctx.rooms.get_link(room.room_id().as_str()) else { return Ok(()) }; + let body = ev.content.body(); + + ctx.tx.send(Message { + origin: (ctx.id, room.room_id().to_string()), + link: link.clone(), + author: sender.name().to_owned(), + content: body.to_owned(), + })?; + Ok(()) +} + +async fn handle_bridge_msg(id: Id, client: &Client, links: &Linkmap, msg: Message) -> TaskResult { + let content = RoomMessageEventContent::text_plain( + format!("<{}> {}", msg.author, msg.content) + ); + for room in links.get_channels(&msg.link) { + if msg.origin.0 == id && &msg.origin.1 == room { + continue; + } + let Ok(room) = <&RoomId>::try_from(room.as_str()) else { continue }; + let Some(Room::Joined(room)) = client.get_room(room) else { continue }; + room.send(content.clone(), None).await?; + } + Ok(()) +} diff --git a/src/bridge_matrix/config.rs b/src/bridge_matrix/config.rs new file mode 100644 index 0000000..61ce86e --- /dev/null +++ b/src/bridge_matrix/config.rs @@ -0,0 +1,8 @@ +use matrix_sdk::ruma::OwnedUserId; +use serde::Deserialize; + +#[derive(Debug, Deserialize)] +pub struct MatrixConfig { + pub user: OwnedUserId, + pub password: String, +} diff --git a/src/bridge_matrix/mod.rs b/src/bridge_matrix/mod.rs index edb8bf7..2d0e51a 100644 --- a/src/bridge_matrix/mod.rs +++ b/src/bridge_matrix/mod.rs @@ -1,102 +1,5 @@ -use std::sync::Arc; +mod bridge; +mod config; -use async_trait::async_trait; -use futures::StreamExt; -use matrix_sdk::Client; -use matrix_sdk::config::SyncSettings; -use matrix_sdk::event_handler::Ctx; -use matrix_sdk::room::Room; -use matrix_sdk::ruma::events::room::message::{SyncRoomMessageEvent, RoomMessageEventContent}; -use matrix_sdk::ruma::{RoomId, OwnedUserId}; -use tokio::select; -use crate::linkmap::Linkmap; -use crate::message::{Id, Message}; -use crate::{message::{Sender, Receiver}, supervisor::{Task, TaskResult}}; - -pub struct MatrixTask { - rooms: Arc>, - user: OwnedUserId, - passwd: String, -} - -impl MatrixTask { - pub fn new(user: OwnedUserId, passwd: String, rooms: Linkmap) -> Self { - Self { - user, - passwd, - rooms: Arc::new(rooms) - } - } -} - -struct Context { - id: Id, - tx: Sender, - rooms: Arc>, -} - -#[async_trait] -impl Task for MatrixTask { - async fn start(&self, id: Id, tx: Sender, mut rx: Receiver) -> TaskResult { - let client = Client::builder() - .server_name(&self.user.server_name()) - .build().await?; - client.login_username(&self.user, &self.passwd) - .send() - .await?; - - client.add_event_handler_context(Arc::new(Context { - id, - tx, - rooms: self.rooms.clone() - })); - client.add_event_handler(handle_matrix_msg); - - let mut sync_stream = Box::pin(client.sync_stream(SyncSettings::default()).await); - - loop { - select! { - response = sync_stream.next() => match response { - Some(Ok(_)) => (), - Some(Err(e)) => return Err(e.into()), - None => return Err("Lost connection to Matrix server".into()) - }, - message = rx.recv() => match message { - Ok(msg) => handle_bridge_msg(id, &client, &self.rooms, msg).await?, - Err(e) => return Err(e.into()) - } - } - } - } -} - -async fn handle_matrix_msg(ev: SyncRoomMessageEvent, client: Client, room: Room, ctx: Ctx>) -> TaskResult { - if Some(ev.sender()) == client.user_id() { return Ok(()) } - let SyncRoomMessageEvent::Original(ev) = ev else { return Ok(()) }; - let Ok(Some(sender)) = room.get_member(&ev.sender).await else { return Ok(()) }; - let Some(link) = ctx.rooms.get_link(room.room_id().as_str()) else { return Ok(()) }; - let body = ev.content.body(); - - ctx.tx.send(Message { - origin: (ctx.id, room.room_id().to_string()), - link: link.clone(), - author: sender.name().to_owned(), - content: body.to_owned(), - })?; - Ok(()) -} - -async fn handle_bridge_msg(id: Id, client: &Client, links: &Linkmap, msg: Message) -> TaskResult { - let content = RoomMessageEventContent::text_plain( - format!("<{}> {}", msg.author, msg.content) - ); - for room in links.get_channels(&msg.link) { - if msg.origin.0 == id && &msg.origin.1 == room { - continue; - } - let Ok(room) = <&RoomId>::try_from(room.as_str()) else { continue }; - let Some(Room::Joined(room)) = client.get_room(room) else { continue }; - room.send(content.clone(), None).await?; - } - Ok(()) -} +pub use bridge::MatrixTask; +pub use config::MatrixConfig; diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..ae5d483 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,36 @@ +use std::collections::HashMap; + +use serde_derive::Deserialize; + +use crate::supervisor::Task; + + +#[derive(Debug, Deserialize)] +#[serde(tag="platform")] +#[serde(rename_all="snake_case")] +pub enum Node { + #[cfg(feature="irc")] + Irc(crate::bridge_irc::IrcTask), + #[cfg(feature="matrix")] + Matrix(crate::bridge_matrix::MatrixTask), + #[cfg(feature="discord")] + Discord(crate::bridge_discord::DiscordTask), +} + +impl Node { + pub fn into_task(self) -> Box { + match self { + #[cfg(feature="irc")] + Node::Irc(t) => Box::new(t), + #[cfg(feature="matrix")] + Node::Matrix(t) => Box::new(t), + #[cfg(feature="discord")] + Node::Discord(t) => Box::new(t), + } + } +} + +#[derive(Debug, Deserialize)] +pub struct Config { + pub nodes: HashMap, +} diff --git a/src/linkmap.rs b/src/linkmap.rs index 2d0efae..2bef46d 100644 --- a/src/linkmap.rs +++ b/src/linkmap.rs @@ -1,21 +1,17 @@ -use std::{collections::HashMap, borrow::Borrow}; +use std::{collections::HashMap, borrow::Borrow, hash::Hash, marker::PhantomData}; -use crate::message::Link; -use std::hash::Hash; +use serde::{de::Visitor, Deserialize, Deserializer}; -pub struct Linkmap { +use crate::supervisor::Link; + + +#[derive(Debug)] +pub struct Linkmap { to: HashMap, from: HashMap>, } impl Linkmap { - pub fn new() -> Self { - Self { - to: HashMap::new(), - from: HashMap::new(), - } - } - pub fn with_capacity(capacity: usize) -> Self { Self { to: HashMap::with_capacity(capacity), @@ -29,14 +25,18 @@ impl Linkmap { } pub fn get_channels(&self, link: &Link) -> &[T] { - self.from.get(link).map(|a| a.as_slice()).unwrap_or(&[]) + self.from.get(link).map_or(&[], Vec::as_slice) } pub fn insert(&mut self, link: Link, channel: T) { self.to.insert(channel.clone(), link.clone()); self.from.entry(link) .or_insert(Vec::with_capacity(1)) - .push(channel.clone()); + .push(channel); + } + + pub fn iter_channels(&self) -> impl Iterator { + self.to.keys() } } @@ -52,3 +52,34 @@ impl FromIterator<(T, Link)> for Linkmap { Self { to, from } } } + +struct LinkmapVisitor { + _phantom: PhantomData, +} + +impl <'de, T> Visitor<'de> for LinkmapVisitor +where T: Eq + Hash + Clone + Deserialize<'de> { + type Value = Linkmap; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(formatter, "a linkmap")?; + Ok(()) + } + + fn visit_map(self, mut access: A) -> Result + where A: serde::de::MapAccess<'de>, { + let mut map = Linkmap::with_capacity(access.size_hint().unwrap_or(0)); + while let Some((channel, link)) = access.next_entry()? { + map.insert(link, channel); + } + Ok(map) + } +} + +impl <'de, T> Deserialize<'de> for Linkmap +where T: Eq + Hash + Clone + Deserialize<'de> { + fn deserialize(deserializer: D) -> Result + where D: Deserializer<'de> { + deserializer.deserialize_map(LinkmapVisitor { _phantom: PhantomData }) + } +} diff --git a/src/main.rs b/src/main.rs index 805aa36..c097090 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,24 +1,35 @@ -use bridge_matrix::MatrixTask; -use linkmap::Linkmap; -use matrix_sdk::ruma::user_id; -use supervisor::Task; -use bridge_irc::{IrcConfig, IrcTask}; +use config::Node; +use crate::config::Config; use crate::supervisor::run_tasks; mod supervisor; -mod message; mod linkmap; +mod config; +#[cfg(feature="irc")] mod bridge_irc; +#[cfg(feature="matrix")] mod bridge_matrix; +#[cfg(feature="discord")] +mod bridge_discord; #[tokio::main] async fn main() { env_logger::init(); - let tasks: Vec> = vec![ - // tasks withheld for privacy - ]; + let conf_str = std::fs::read_to_string("bridge.toml").unwrap(); + let config: Config = match toml::from_str(&conf_str) { + Ok(c) => c, + Err(e) => { + eprintln!("Error reading config: {e}"); + return + } + }; + + let tasks = config.nodes.into_iter() + .map(|(k, t)| (k, Node::into_task(t))) + .collect(); + run_tasks(tasks).await; } diff --git a/src/message.rs b/src/message.rs deleted file mode 100644 index 1286376..0000000 --- a/src/message.rs +++ /dev/null @@ -1,30 +0,0 @@ -pub type Sender = tokio::sync::broadcast::Sender; -pub type Receiver = tokio::sync::broadcast::Receiver; - -#[derive(Clone, Debug, Eq, PartialEq, Hash)] -pub struct Link(String); - -impl From for Link -where S: Into { - fn from(value: S) -> Self { - Self(value.into()) - } -} - -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub struct Id(usize); - -impl Id { - pub const fn new(n: usize) -> Self { - Self(n) - } -} - -#[derive(Clone, Debug)] -pub struct Message { - pub origin: (Id, String), - pub link: Link, - pub author: String, - pub content: String, -} - diff --git a/src/supervisor.rs b/src/supervisor.rs index 27d810f..dce869b 100644 --- a/src/supervisor.rs +++ b/src/supervisor.rs @@ -1,14 +1,46 @@ -use std::{any::Any, time::Duration}; -use std::error::Error; -use std::panic::AssertUnwindSafe; -use futures::FutureExt; +use std::{fmt, error::Error, time::Duration, any::Any, panic::AssertUnwindSafe, collections::HashMap}; use async_trait::async_trait; -use futures::StreamExt; -use futures::stream::FuturesUnordered; -use log::{warn, info, error}; +use futures::{stream::FuturesUnordered, StreamExt, FutureExt}; +use log::{warn, error}; +use serde_derive::Deserialize; -use crate::message::{Sender, Receiver, Id}; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct Id(&'static str); + +impl Id { + pub fn new(s: String) -> Self { + Self(Box::leak(s.into_boxed_str())) + } +} + +impl fmt::Display for Id { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(self.0) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, Hash, Deserialize)] +pub struct Link(String); + +impl From for Link +where S: Into { + fn from(value: S) -> Self { + Self(value.into()) + } +} + +#[derive(Clone, Debug)] +pub struct Message { + pub origin: (Id, String), + pub link: Link, + pub author: String, + pub content: String, +} + +pub type Sender = tokio::sync::broadcast::Sender; +pub type Receiver = tokio::sync::broadcast::Receiver; pub type TaskResult = Result<(), Box>; @@ -26,10 +58,14 @@ enum ExitStatus { Panic(Box), } -async fn start_task(id: usize, task: &dyn Task, tx: Sender, timeout: Duration) -> (usize, ExitStatus) { - tokio::time::sleep(timeout).await; +async fn start_task(id: Id, task: &dyn Task, tx: Sender, timeout: Duration) -> (Id, ExitStatus) { + if !timeout.is_zero() { + warn!("task '{id}': waiting {timeout:?} to start"); + tokio::time::sleep(timeout).await; + } + warn!("task '{id}': starting"); let rx = tx.subscribe(); - let future = AssertUnwindSafe(task.start(Id::new(id), tx, rx)).catch_unwind(); + let future = AssertUnwindSafe(task.start(id, tx, rx)).catch_unwind(); let result = match future.await { Ok(Ok(_)) => ExitStatus::Success, Ok(Err(e)) => ExitStatus::Error(e), @@ -38,21 +74,22 @@ async fn start_task(id: usize, task: &dyn Task, tx: Sender, timeout: Duration) - (id, result) } -pub async fn run_tasks(tasks: Vec>) { +pub async fn run_tasks(tasks: HashMap>) { let mut futures = FuturesUnordered::new(); let (tx, _) = tokio::sync::broadcast::channel(64); - for (id, task) in tasks.iter().enumerate() { + for (id, task) in &tasks { + let id = Id::new(id.clone()); futures.push(start_task(id, task.as_ref(), tx.clone(), Duration::ZERO)); } while let Some((id, result)) = futures.next().await { - let task = &tasks[id]; + let task = &tasks[id.0]; + let dur = task.restart_timeout(); match &result { - ExitStatus::Success => warn!("task {id:?} exited successfully"), - ExitStatus::Error(e) => warn!("task {id:?}: exited with error: {e}"), - ExitStatus::Panic(_) => error!("task {id:?}: panicked"), + ExitStatus::Success => warn!("task '{id}' exited without error"), + ExitStatus::Error(e) => warn!("task '{id}': exited with error: {e}"), + ExitStatus::Panic(_) => error!("task '{id}': panicked"), } - if let Some(dur) = task.restart_timeout() { - info!("task {id:?}: retrying in {dur:?}"); + if let Some(dur) = dur { futures.push(start_task(id, task.as_ref(), tx.clone(), dur)); } }