// SPDX-FileCopyrightText: 2025 Tomasz Kramkowski // SPDX-License-Identifier: GPL-3.0-or-later // TODO: Log levels use std::{path::PathBuf, process::Stdio, time::Duration}; use anyhow::Context; use rumqttc::{Event::Incoming, Packet, Publish, QoS}; use tokio::{io::AsyncWriteExt, process::Command, time::timeout}; mod config; const PROGRAM: &str = "mqttr"; async fn run(program: &[String], message: &Publish) -> anyhow::Result<()> { let mut command = Command::new(&program[0]); command .args(&program[1..]) .arg(&message.topic) .arg(format!("{}", message.dup as u8)) .arg(format!("{}", message.qos as u8)) .arg(format!("{}", message.retain as u8)); if message.qos == QoS::AtLeastOnce || message.qos == QoS::ExactlyOnce { command.arg(format!("{}", message.pkid)); } let mut proc = command.stdin(Stdio::piped()).spawn()?; let mut stdin = proc.stdin.take().context("No stdin")?; stdin.write(&message.payload).await?; drop(stdin); println!("{}", proc.wait().await?); Ok(()) } fn topic_match(filter: &str, topic: &str) -> bool { // TODO: Should probably just be a panic or prevented using types if filter.is_empty() || topic.is_empty() { return false; } if topic.starts_with('$') && (filter.starts_with('+') || filter.starts_with('#')) { return false; } // zip_longest would be nice let mut topic = topic.split('/'); let mut filter = filter.split('/'); loop { let topic_level = topic.next(); return match filter.next() { Some("#") => filter.next().is_none(), Some("+") => { if topic_level.is_none() { false } else { continue; } } Some(filter_level) => match topic_level { Some(topic_level) if topic_level == filter_level => { continue; } _ => false, }, None => topic_level.is_none(), }; } } #[tokio::main(flavor = "current_thread")] async fn main() -> anyhow::Result<()> { let mut conf_path: PathBuf = option_env!("SYSCONFDIR").unwrap_or("/usr/local/etc").into(); conf_path.push(format!("{PROGRAM}.toml")); let conf = config::load(&conf_path) .with_context(|| format!("Failed to load config: {:?}", &conf_path))?; let (client, mut event_loop) = conf.mqtt_client(); for topic in conf.routes.keys() { // TODO: Configurable subscription QoS if let Err(e) = client.subscribe(topic, QoS::ExactlyOnce).await { eprintln!("warning: Failed to subscribe to '{topic}': {e:?}"); } } loop { let notification = event_loop.poll().await; match notification? { Incoming(Packet::Publish(p)) => { for (topic, programs) in conf.routes.iter() { if !topic_match(&topic, &p.topic) { continue; } for program in programs { // TODO: Switch to moro_local to avoid this ewwyness let program = program.clone(); let p = p.clone(); tokio::spawn(async move { match timeout(Duration::from_secs(60), run(&program, &p)).await { Err(_) => eprintln!( "error: Execution of {program:?} for message {p:?} timed out" ), Ok(Err(e)) => eprintln!("error: Failed to run {program:?}: {e:?}"), _ => (), } }); } } } _ => (), } } } #[cfg(test)] mod tests { use super::topic_match; #[test] fn topic_match_basic() { assert!(topic_match("foo/bar/baz", "foo/bar/baz")); assert!(!topic_match("foo/bar/baz", "foo/bar/qux")); assert!(!topic_match("foo/bar", "foo/bar/baz")); assert!(!topic_match("foo/bar/baz", "foo/bar")); } #[test] fn topic_match_wildcard_hash() { assert!(topic_match("foo/bar/baz/#", "foo/bar/baz")); assert!(topic_match("foo/bar/baz/#", "foo/bar/baz/qux")); assert!(topic_match("foo/bar/baz/#", "foo/bar/baz/qux/quux")); assert!(topic_match("#", "foo/bar/baz")); assert!(topic_match("#", "foo")); assert!(topic_match("#", "/")); assert!(topic_match("#", "/foo")); assert!(!topic_match("foo/bar/#", "foo/baz/bar")); assert!(!topic_match("foo/bar/#", "foo")); } #[test] fn topic_match_wildcard_plus() { assert!(topic_match("foo/bar/+", "foo/bar/baz")); assert!(topic_match("foo/bar/+", "foo/bar/qux")); assert!(!topic_match("foo/bar/+", "foo/bar/baz/qux")); assert!(topic_match("foo/+", "foo/")); assert!(!topic_match("foo/+", "foo")); assert!(topic_match("+", "foo")); assert!(topic_match("+/bar/#", "foo/bar/baz/qux")); assert!(topic_match("+/bar/#", "qux/bar")); assert!(topic_match("foo/+/baz", "foo/bar/baz")); assert!(topic_match("foo/+/baz", "foo/qux/baz")); assert!(!topic_match("foo/+/baz", "foo/bar/qux")); assert!(topic_match("+/+", "/foo")); assert!(topic_match("/+", "/foo")); assert!(!topic_match("+", "/foo")); } #[test] fn topic_match_dollar() { assert!(!topic_match("#", "$foo/bar")); assert!(!topic_match("+/bar/baz", "$foo/bar/baz")); assert!(topic_match("$foo/#", "$foo/bar")); assert!(topic_match("$foo/#", "$foo/bar/baz")); assert!(topic_match("$foo/#", "$foo")); assert!(topic_match("$foo/bar/+", "$foo/bar/baz")); assert!(!topic_match("$foo/#", "foo/bar")); } #[test] fn topic_match_edge_cases() { assert!(!topic_match("foo", "FOO")); assert!(topic_match("foo bar", "foo bar")); assert!(!topic_match("foo bar", "foo bar")); assert!(!topic_match("foo bar", "foo bar")); assert!(!topic_match("/foo", "foo")); assert!(!topic_match("foo", "/foo")); assert!(topic_match("foo//bar", "foo//bar")); assert!(!topic_match("foo/bar", "foo//bar")); assert!(!topic_match("foo//bar", "foo/bar")); assert!(!topic_match("foo//baz", "foo/bar/baz")); assert!(!topic_match("foo/bar/baz", "foo//baz")); assert!(topic_match("foo/+/baz", "foo//baz")); assert!(topic_match("/", "/")); assert!(!topic_match("/", "foo")); assert!(!topic_match("foo", "/")); assert!(!topic_match("", "")); assert!(!topic_match("+", "")); assert!(!topic_match("#", "")); assert!(!topic_match("foo/#/baz", "foo/bar/baz")); } }