129 lines
3.7 KiB
Rust
129 lines
3.7 KiB
Rust
use std::sync::Arc;
|
|
|
|
use async_trait::async_trait;
|
|
use futures::StreamExt;
|
|
use log::debug;
|
|
use matrix_sdk::{Client, config::SyncSettings, event_handler::Ctx, room::Room};
|
|
use matrix_sdk::ruma::{RoomId, MxcUri};
|
|
use matrix_sdk::ruma::events::room::message::{SyncRoomMessageEvent, RoomMessageEventContent};
|
|
use serde::Deserialize;
|
|
use tokio::select;
|
|
use crate::linkmap::Linkmap;
|
|
use crate::supervisor::{Message, Id, Sender, Receiver, Task, TaskResult};
|
|
|
|
use super::MatrixConfig;
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
pub struct MatrixTask {
|
|
config: MatrixConfig,
|
|
links: Arc<Linkmap<String>>,
|
|
}
|
|
|
|
struct Context {
|
|
id: Id,
|
|
tx: Sender,
|
|
suffix: Arc<str>,
|
|
rooms: Arc<Linkmap<String>>,
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Task for MatrixTask {
|
|
async fn start(&self, id: Id, tx: Sender, mut rx: Receiver) -> TaskResult {
|
|
debug!("{id}: building client");
|
|
let client = Client::builder()
|
|
.server_name(self.config.user.server_name())
|
|
.build().await?;
|
|
|
|
debug!("{id}: logging in");
|
|
client.login_username(&self.config.user, &self.config.password)
|
|
.send()
|
|
.await?;
|
|
|
|
client.add_event_handler_context(Arc::new(Context {
|
|
id,
|
|
tx,
|
|
suffix: self.config.suffix.clone(),
|
|
rooms: self.links.clone()
|
|
}));
|
|
client.add_event_handler(handle_matrix_msg);
|
|
|
|
let mut sync_stream = Box::pin(client.sync_stream(SyncSettings::default()).await);
|
|
|
|
debug!("{id}: entering event loop");
|
|
|
|
loop {
|
|
select! {
|
|
response = sync_stream.next() => match response {
|
|
Some(Ok(_)) => (),
|
|
Some(Err(e)) => return Err(e.into()),
|
|
None => return Err("Lost connection to Matrix server".into())
|
|
},
|
|
message = rx.recv() => match message {
|
|
Ok(msg) => handle_bridge_msg(id, &client, &self.links, msg).await?,
|
|
Err(e) => return Err(e.into())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn mxc_thumbnail(client: &Client, mxc: &MxcUri, width: u32, height: u32) -> Option<String> {
|
|
let mut hs = client.homeserver().await;
|
|
|
|
let (server_name, media_id) = mxc.parts().ok()?;
|
|
|
|
hs.path_segments_mut().ok()?
|
|
.extend(["_matrix", "media", "v3", "thumbnail"])
|
|
.push(server_name.as_str())
|
|
.push(media_id);
|
|
|
|
hs.query_pairs_mut()
|
|
.append_pair("width", &width.to_string())
|
|
.append_pair("height", &height.to_string());
|
|
|
|
Some(hs.to_string())
|
|
}
|
|
|
|
async fn handle_matrix_msg(ev: SyncRoomMessageEvent, client: Client, room: Room, ctx: Ctx<Arc<Context>>) -> TaskResult {
|
|
if Some(ev.sender()) == client.user_id() { return Ok(()) }
|
|
let SyncRoomMessageEvent::Original(ev) = ev else { return Ok(()) };
|
|
let Ok(Some(sender)) = room.get_member(&ev.sender).await else { return Ok(()) };
|
|
let Some(link) = ctx.rooms.get_link(room.room_id().as_str()) else { return Ok(()) };
|
|
|
|
let body = ev.content.body();
|
|
let avatar = match sender.avatar_url() {
|
|
None => None,
|
|
Some(u) => mxc_thumbnail(&client, u, 128, 128).await,
|
|
};
|
|
|
|
debug!("{}: broadcasting message from {:?} ({}) to {:?}", ctx.id, room.name(), room.room_id(), link);
|
|
ctx.tx.send(Message {
|
|
origin: (ctx.id, room.room_id().to_string()),
|
|
link: link.clone(),
|
|
author: sender.name().to_owned(),
|
|
suffix: ctx.suffix.clone(),
|
|
content: body.to_owned(),
|
|
avatar,
|
|
})?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn handle_bridge_msg(id: Id, client: &Client, links: &Linkmap<String>, msg: Message) -> TaskResult {
|
|
let content = RoomMessageEventContent::text_plain(
|
|
format!("<{}> {}", msg.author, msg.content)
|
|
);
|
|
|
|
for room in links.get_channels(&msg.link) {
|
|
if msg.origin.0 == id && &msg.origin.1 == room {
|
|
continue;
|
|
}
|
|
|
|
let Ok(room) = <&RoomId>::try_from(room.as_str()) else { continue };
|
|
let Some(Room::Joined(room)) = client.get_room(room) else { continue };
|
|
|
|
debug!("{id}: bridging message from {:?}/{:?} to {:?} ({})", msg.origin, msg.link, room.name(), room.room_id());
|
|
room.send(content.clone(), None).await?;
|
|
}
|
|
Ok(())
|
|
}
|