Initial commit

This commit is contained in:
TriMill 2022-07-03 16:50:25 -04:00
commit 3b81b5a959
8 changed files with 1426 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

1096
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

15
Cargo.toml Normal file
View File

@ -0,0 +1,15 @@
[package]
name = "rss_bundler"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
chrono = "0.4.19"
reqwest = { version = "0.11.11", features = ["blocking"]}
rss = { version = "2.0.1", default_features = false }
serde = { version = "1.0.138", features = ["derive"] }
serde_json = "1.0.82"
strfmt = "0.1.6"
tiny_http = "0.11.0"

40
src/config.rs Normal file
View File

@ -0,0 +1,40 @@
use serde::{Serialize, Deserialize};
fn const_true() -> bool { true }
fn default_timeout() -> u64 { 60 }
fn default_port() -> u16 { 4400 }
fn default_host() -> String { "127.0.0.1".into() }
fn default_worker_threads() -> usize { 4 }
fn default_title_format() -> String { "[{name}] {title}".into() }
fn default_default_title() -> String { "<untitled>".into() }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
pub title: String,
pub link: String,
#[serde(default)]
pub description: String,
#[serde(default="default_default_title")]
pub default_title: String,
#[serde(default="default_timeout")]
pub refresh_time: u64,
#[serde(default="const_true")]
pub status_page: bool,
#[serde(default="default_title_format")]
pub title_format: String,
#[serde(default="default_worker_threads")]
pub worker_threads: usize,
#[serde(default="default_port")]
pub port: u16,
#[serde(default="default_host")]
pub host: String,
pub users: Vec<User>,
}
#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
pub struct User {
pub name: String,
pub rss: String,
}

79
src/junction.rs Normal file
View File

@ -0,0 +1,79 @@
use std::collections::HashMap;
use chrono::{DateTime, SubsecRound};
use rss::Channel;
use strfmt::strfmt;
use crate::Feed;
use crate::config::{Config, User};
pub fn bundle_rss(feeds: &HashMap<User, Feed>, config: &Config) -> Channel {
let mut bundle = Channel::default();
bundle.set_title(&config.title);
bundle.set_link(&config.link);
bundle.description = config.description.clone();
bundle.set_generator(Some("RSS Bundler".into()));
let mut most_recent_date = None;
for (user, feed) in feeds {
if let Some(channel) = &feed.channel {
for item in channel.items() {
if let Some(pub_date) = &item.pub_date {
if let Ok(date) = DateTime::parse_from_rfc2822(pub_date) {
match most_recent_date {
None => most_recent_date = Some(date),
Some(d) if date > d => most_recent_date = Some(date),
_ => ()
}
}
}
let mut item = item.clone();
let item_title = {
let title = item.title.as_ref().unwrap_or(&config.default_title);
let mut args = HashMap::new();
args.insert("title".into(), title);
args.insert("name".into(), &user.name);
match strfmt(&config.title_format, &args) {
Ok(res) => res,
Err(e) => {
eprintln!("Format string error: {}. Using default format string instead.", e);
format!("[{}] {}", title, user.name)
}
}
};
item.set_title(item_title);
if item.author.is_none() {
item.set_author(user.name.clone());
}
bundle.items.push(item.clone());
}
}
}
if let Some(date) = most_recent_date {
bundle.set_pub_date(date.to_rfc2822());
}
bundle
}
pub fn gen_status(feeds: &HashMap<User, Feed>) -> String {
let max_user_length = feeds.iter()
.map(|(user, _)| user.name.len())
.max().unwrap_or(0).max(4);
let max_timestamp_length = feeds.iter()
.map(|(_, feed)| feed.last_fetched.round_subsecs(0).to_rfc3339().len())
.max().unwrap_or(0).max(12);
let mut lines = vec![
format!("{:w_user$}\t{:w_time$}\t{:6}\t{}",
"USER", "LAST SUCCESS", "STATUS", "ERROR",
w_user=max_user_length, w_time=max_timestamp_length)
];
for (user, feed) in feeds {
let (status, error) = match &feed.error_message {
Some(e) => ("ERROR", e.as_str()),
None => ("OK", ""),
};
lines.push(format!("{:w_user$}\t{:w_time$}\t{:6}\t{}",
user.name, feed.last_fetched.round_subsecs(0).to_rfc3339(), status, error,
w_user=max_user_length, w_time=max_timestamp_length));
}
lines.join("\n")
}

90
src/main.rs Normal file
View File

@ -0,0 +1,90 @@
#![warn(clippy::pedantic)]
use std::{collections::HashMap, thread, sync::{Mutex, Arc}, time::Duration, env::args, process::ExitCode, fs};
use chrono::{DateTime, Utc};
use config::Config;
use query::update_feeds;
use rss::Channel;
use crate::{junction::{bundle_rss, gen_status}};
mod config;
mod query;
mod junction;
mod server;
#[derive(Clone, Debug)]
pub struct Feed {
channel: Option<Channel>,
last_fetched: DateTime<Utc>,
error_message: Option<String>,
}
pub struct State {
rss: String,
status: Option<String>,
}
fn main() -> ExitCode {
let mut args = args();
let exe = args.next();
let config_file = args.next();
let config_file = match &config_file {
Some(s) if s == "--help" => {
eprintln!(
"Usage: {} <config-file>\nDocumentation available at https://github.com/trimill/rss-bundler",
exe.unwrap_or_else(|| "rssbundler".into()));
return 0.into()
}
Some(file) => file,
None => {
eprintln!("No config file provided.");
return 1.into()
}
};
let config = match load_config(config_file) {
Ok(config) => config,
Err(e) => {
eprintln!("Error loading config: {}", e);
return 1.into()
}
};
let mut feeds = HashMap::new();
let state = State {
rss: "".into(),
status: None,
};
let state = Arc::new(Mutex::new(state));
let server_address = format!("{}:{}", config.host, config.port);
println!("Starting server at {}", server_address);
let server_threads = server::start(&server_address, config.worker_threads, state.clone());
drop(server_threads);
let sleep_duration = Duration::from_secs(60 * config.refresh_time);
loop {
update_feeds(&mut feeds, &config);
let bundle = bundle_rss(&feeds, &config);
let status = if config.status_page {
Some(gen_status(&feeds))
} else { None };
let mut guard = state.lock().unwrap();
guard.status = status;
guard.rss = bundle.to_string();
drop(guard);
println!("Feeds updated");
thread::sleep(sleep_duration);
}
}
fn load_config(config_file: &str) -> Result<Config, Box<dyn std::error::Error>> {
let content = fs::read_to_string(config_file)?;
let config: Config = serde_json::from_str(&content)?;
Ok(config)
}

48
src/query.rs Normal file
View File

@ -0,0 +1,48 @@
use std::{collections::HashMap, time::Duration, str::FromStr};
use chrono::{Utc, TimeZone};
use reqwest::blocking::Client;
use rss::Channel;
use crate::Feed;
use crate::config::{User, Config};
pub fn update_feeds(feeds: &mut HashMap<User, Feed>, config: &Config) {
let client = Client::new();
for user in &config.users {
let feed = match feeds.get_mut(user) {
Some(feed) => feed,
None => {
let feed = Feed {
channel: None,
error_message: None,
last_fetched: Utc.ymd(1970, 1, 1).and_hms(0, 0, 0)
};
feeds.insert(user.clone(), feed);
feeds.get_mut(user).unwrap()
}
};
let res = client.get(&user.rss)
.timeout(Duration::from_secs(5))
.send();
let time = Utc::now();
match res {
Ok(res) if res.status().is_success() => match res.text() {
Ok(text) => match Channel::from_str(&text) {
Ok(channel) => {
feed.last_fetched = time;
feed.error_message = None;
feed.channel = Some(channel);
},
Err(e) => feed.error_message = Some(e.to_string())
},
Err(e) => feed.error_message = Some(e.to_string()),
},
Ok(res) => match res.status().canonical_reason() {
Some(reason) => feed.error_message = Some(format!("HTTP {} ({})", res.status().as_str(), reason)),
None => feed.error_message = Some(format!("HTTP {} (unknown)", res.status().as_str())),
},
Err(e) => feed.error_message = Some(e.to_string()),
}
}
}

57
src/server.rs Normal file
View File

@ -0,0 +1,57 @@
use std::{sync::{Arc, Mutex}, thread::{self, JoinHandle}};
use reqwest::Url;
use tiny_http::Response;
use crate::State;
pub enum Void {}
pub fn start(address: &str, thread_count: usize, state: Arc<Mutex<State>>) -> Vec<JoinHandle<Void>> {
let server = tiny_http::Server::http(address.to_owned()).unwrap();
let server = Arc::new(server);
let state = Arc::new(state);
let mut threads = Vec::with_capacity(thread_count);
for i in 0..thread_count {
let server = server.clone();
let state = state.clone();
let address = address.to_owned();
let thread = thread::spawn(move || {
loop {
let rq = server.recv().unwrap();
println!("[{}] {:?}", i, rq);
let full_url = "http://".to_string() + &address + rq.url();
let url = match Url::parse(&full_url) {
Ok(url) => url,
Err(e) => {
let result = rq.respond(Response::from_string(e.to_string()).with_status_code(400));
if let Err(e) = result {
eprintln!("Error responding to request: {}", e);
}
continue
}
};
let res = match url.path() {
"/rss.xml" => {
let guard = state.lock().unwrap();
let header = tiny_http::Header::from_bytes(&b"Content-Type"[..], &b"text/xml"[..]).unwrap();
Response::from_string(&guard.rss).with_header(header)
},
"/status" => {
let guard = state.lock().unwrap();
Response::from_string(guard.status.as_ref().unwrap_or(&"Status page disabled".into()))
},
_ => Response::from_string("Not found").with_status_code(404)
};
let result = rq.respond(res);
if let Err(e) = result {
eprintln!("Error responding to request: {}", e);
}
}
});
threads.push(thread);
}
threads
}