use std::sync::Arc; use async_trait::async_trait; use futures::StreamExt; use log::debug; use matrix_sdk::{Client, config::SyncSettings, event_handler::Ctx, room::Room}; use matrix_sdk::ruma::RoomId; use matrix_sdk::ruma::events::room::message::{SyncRoomMessageEvent, RoomMessageEventContent}; use serde::Deserialize; use tokio::select; use crate::linkmap::Linkmap; use crate::supervisor::{Message, Id, Sender, Receiver, Task, TaskResult}; use super::MatrixConfig; #[derive(Debug, Deserialize)] pub struct MatrixTask { config: MatrixConfig, links: Arc>, } struct Context { id: Id, tx: Sender, rooms: Arc>, } #[async_trait] impl Task for MatrixTask { async fn start(&self, id: Id, tx: Sender, mut rx: Receiver) -> TaskResult { debug!("{id}: building client"); let client = Client::builder() .server_name(self.config.user.server_name()) .build().await?; debug!("{id}: logging in"); client.login_username(&self.config.user, &self.config.password) .send() .await?; client.add_event_handler_context(Arc::new(Context { id, tx, rooms: self.links.clone() })); client.add_event_handler(handle_matrix_msg); let mut sync_stream = Box::pin(client.sync_stream(SyncSettings::default()).await); debug!("{id}: entering event loop"); loop { select! { response = sync_stream.next() => match response { Some(Ok(_)) => (), Some(Err(e)) => return Err(e.into()), None => return Err("Lost connection to Matrix server".into()) }, message = rx.recv() => match message { Ok(msg) => handle_bridge_msg(id, &client, &self.links, msg).await?, Err(e) => return Err(e.into()) } } } } } async fn handle_matrix_msg(ev: SyncRoomMessageEvent, client: Client, room: Room, ctx: Ctx>) -> TaskResult { if Some(ev.sender()) == client.user_id() { return Ok(()) } let SyncRoomMessageEvent::Original(ev) = ev else { return Ok(()) }; let Ok(Some(sender)) = room.get_member(&ev.sender).await else { return Ok(()) }; let Some(link) = ctx.rooms.get_link(room.room_id().as_str()) else { return Ok(()) }; let body = ev.content.body(); debug!("{}: broadcasting message from {:?} to {:?}", ctx.id, room, link); ctx.tx.send(Message { origin: (ctx.id, room.room_id().to_string()), link: link.clone(), author: sender.name().to_owned(), content: body.to_owned(), })?; Ok(()) } async fn handle_bridge_msg(id: Id, client: &Client, links: &Linkmap, msg: Message) -> TaskResult { let content = RoomMessageEventContent::text_plain( format!("<{}> {}", msg.author, msg.content) ); for room in links.get_channels(&msg.link) { if msg.origin.0 == id && &msg.origin.1 == room { continue; } let Ok(room) = <&RoomId>::try_from(room.as_str()) else { continue }; let Some(Room::Joined(room)) = client.get_room(room) else { continue }; debug!("{id}: bridging message from {:?}/{:?} to {room:?}", msg.origin, msg.link); room.send(content.clone(), None).await?; } Ok(()) }