use std::sync::Arc; use async_trait::async_trait; use futures::StreamExt; use log::debug; use matrix_sdk::{Client, config::SyncSettings, event_handler::Ctx, room::Room}; use matrix_sdk::ruma::{RoomId, MxcUri}; use matrix_sdk::ruma::events::room::message::{SyncRoomMessageEvent, RoomMessageEventContent}; use serde::Deserialize; use tokio::select; use crate::linkmap::Linkmap; use crate::supervisor::{Message, Id, Sender, Receiver, Task, TaskResult}; use super::MatrixConfig; #[derive(Debug, Deserialize)] pub struct MatrixTask { config: MatrixConfig, links: Arc>, } struct Context { id: Id, tx: Sender, suffix: Arc, rooms: Arc>, } #[async_trait] impl Task for MatrixTask { async fn start(&self, id: Id, tx: Sender, mut rx: Receiver) -> TaskResult { debug!("{id}: building client"); let client = Client::builder() .server_name(self.config.user.server_name()) .build().await?; debug!("{id}: logging in"); client.login_username(&self.config.user, &self.config.password) .send() .await?; client.add_event_handler_context(Arc::new(Context { id, tx, suffix: self.config.suffix.clone(), rooms: self.links.clone() })); client.add_event_handler(handle_matrix_msg); let mut sync_stream = Box::pin(client.sync_stream(SyncSettings::default()).await); debug!("{id}: entering event loop"); loop { select! { response = sync_stream.next() => match response { Some(Ok(_)) => (), Some(Err(e)) => return Err(e.into()), None => return Err("Lost connection to Matrix server".into()) }, message = rx.recv() => match message { Ok(msg) => handle_bridge_msg(id, &client, &self.links, msg).await?, Err(e) => return Err(e.into()) } } } } } async fn mxc_thumbnail(client: &Client, mxc: &MxcUri, width: u32, height: u32) -> Option { let mut hs = client.homeserver().await; let (server_name, media_id) = mxc.parts().ok()?; hs.path_segments_mut().ok()? .extend(["_matrix", "media", "v3", "thumbnail"]) .push(server_name.as_str()) .push(media_id); hs.query_pairs_mut() .append_pair("width", &width.to_string()) .append_pair("height", &height.to_string()); Some(hs.to_string()) } async fn handle_matrix_msg(ev: SyncRoomMessageEvent, client: Client, room: Room, ctx: Ctx>) -> TaskResult { if Some(ev.sender()) == client.user_id() { return Ok(()) } let SyncRoomMessageEvent::Original(ev) = ev else { return Ok(()) }; let Ok(Some(sender)) = room.get_member(&ev.sender).await else { return Ok(()) }; let Some(link) = ctx.rooms.get_link(room.room_id().as_str()) else { return Ok(()) }; let body = ev.content.body(); let avatar = match sender.avatar_url() { None => None, Some(u) => mxc_thumbnail(&client, u, 128, 128).await, }; debug!("{}: broadcasting message from {:?} ({}) to {:?}", ctx.id, room.name(), room.room_id(), link); ctx.tx.send(Message { origin: (ctx.id, room.room_id().to_string()), link: link.clone(), author: sender.name().to_owned(), suffix: ctx.suffix.clone(), content: body.to_owned(), avatar, })?; Ok(()) } async fn handle_bridge_msg(id: Id, client: &Client, links: &Linkmap, msg: Message) -> TaskResult { let content = RoomMessageEventContent::text_plain( format!("<{}> {}", msg.author, msg.content) ); for room in links.get_channels(&msg.link) { if msg.origin.0 == id && &msg.origin.1 == room { continue; } let Ok(room) = <&RoomId>::try_from(room.as_str()) else { continue }; let Some(Room::Joined(room)) = client.get_room(room) else { continue }; debug!("{id}: bridging message from {:?}/{:?} to {:?} ({})", msg.origin, msg.link, room.name(), room.room_id()); room.send(content.clone(), None).await?; } Ok(()) }