abridged/src/bridge_matrix/bridge.rs

126 lines
3.6 KiB
Rust

use std::sync::Arc;
use async_trait::async_trait;
use futures::StreamExt;
use log::debug;
use matrix_sdk::{Client, config::SyncSettings, event_handler::Ctx, room::Room};
use matrix_sdk::ruma::{RoomId, MxcUri};
use matrix_sdk::ruma::events::room::message::{SyncRoomMessageEvent, RoomMessageEventContent};
use serde::Deserialize;
use tokio::select;
use crate::linkmap::Linkmap;
use crate::supervisor::{Message, Id, Sender, Receiver, Task, TaskResult};
use super::MatrixConfig;
#[derive(Debug, Deserialize)]
pub struct MatrixTask {
config: MatrixConfig,
links: Arc<Linkmap<String>>,
}
struct Context {
id: Id,
tx: Sender,
rooms: Arc<Linkmap<String>>,
}
#[async_trait]
impl Task for MatrixTask {
async fn start(&self, id: Id, tx: Sender, mut rx: Receiver) -> TaskResult {
debug!("{id}: building client");
let client = Client::builder()
.server_name(self.config.user.server_name())
.build().await?;
debug!("{id}: logging in");
client.login_username(&self.config.user, &self.config.password)
.send()
.await?;
client.add_event_handler_context(Arc::new(Context {
id,
tx,
rooms: self.links.clone()
}));
client.add_event_handler(handle_matrix_msg);
let mut sync_stream = Box::pin(client.sync_stream(SyncSettings::default()).await);
debug!("{id}: entering event loop");
loop {
select! {
response = sync_stream.next() => match response {
Some(Ok(_)) => (),
Some(Err(e)) => return Err(e.into()),
None => return Err("Lost connection to Matrix server".into())
},
message = rx.recv() => match message {
Ok(msg) => handle_bridge_msg(id, &client, &self.links, msg).await?,
Err(e) => return Err(e.into())
}
}
}
}
}
async fn mxc_thumbnail(client: &Client, mxc: &MxcUri, width: u32, height: u32) -> Option<String> {
let mut hs = client.homeserver().await;
let (server_name, media_id) = mxc.parts().ok()?;
hs.path_segments_mut().ok()?
.extend(["_matrix", "media", "v3", "thumbnail"])
.push(server_name.as_str())
.push(media_id);
hs.query_pairs_mut()
.append_pair("width", &width.to_string())
.append_pair("height", &height.to_string());
Some(hs.to_string())
}
async fn handle_matrix_msg(ev: SyncRoomMessageEvent, client: Client, room: Room, ctx: Ctx<Arc<Context>>) -> TaskResult {
if Some(ev.sender()) == client.user_id() { return Ok(()) }
let SyncRoomMessageEvent::Original(ev) = ev else { return Ok(()) };
let Ok(Some(sender)) = room.get_member(&ev.sender).await else { return Ok(()) };
let Some(link) = ctx.rooms.get_link(room.room_id().as_str()) else { return Ok(()) };
let body = ev.content.body();
let avatar = match sender.avatar_url() {
None => None,
Some(u) => mxc_thumbnail(&client, u, 128, 128).await,
};
debug!("{}: broadcasting message from {:?} ({}) to {:?}", ctx.id, room.name(), room.room_id(), link);
ctx.tx.send(Message {
origin: (ctx.id, room.room_id().to_string()),
link: link.clone(),
author: sender.name().to_owned(),
content: body.to_owned(),
avatar,
})?;
Ok(())
}
async fn handle_bridge_msg(id: Id, client: &Client, links: &Linkmap<String>, msg: Message) -> TaskResult {
let content = RoomMessageEventContent::text_plain(
format!("<{}> {}", msg.author, msg.content)
);
for room in links.get_channels(&msg.link) {
if msg.origin.0 == id && &msg.origin.1 == room {
continue;
}
let Ok(room) = <&RoomId>::try_from(room.as_str()) else { continue };
let Some(Room::Joined(room)) = client.get_room(room) else { continue };
debug!("{id}: bridging message from {:?}/{:?} to {:?} ({})", msg.origin, msg.link, room.name(), room.room_id());
room.send(content.clone(), None).await?;
}
Ok(())
}