// SPDX-FileCopyrightText: 2025 Tomasz Kramkowski // SPDX-License-Identifier: GPL-3.0-or-later use std::{ os::unix::process::ExitStatusExt, path::PathBuf, process::{ExitStatus, Stdio}, rc::Rc, }; use anyhow::Context; use log::{debug, error, trace, warn}; use rumqttc::{Event::Incoming, Packet, Publish, QoS}; use tokio::{io::AsyncWriteExt, process::Command, time::timeout}; mod config; mod mqtt; const PROGRAM: &str = "mqttr"; async fn run(program: &[String], message: &Publish) -> anyhow::Result { debug!("Starting program {program:?} for message {message:?}"); let mut command = Command::new(&program[0]); command .args(&program[1..]) .arg(&message.topic) .arg(format!("{}", message.dup as u8)) .arg(format!("{}", message.qos as u8)) .arg(format!("{}", message.retain as u8)); if message.qos == QoS::AtLeastOnce || message.qos == QoS::ExactlyOnce { command.arg(format!("{}", message.pkid)); } let mut proc = command.stdin(Stdio::piped()).spawn()?; trace!( "Started program {program:?} with PID {}", proc.id().expect("missing PID") ); let mut stdin = proc.stdin.take().context("No stdin")?; stdin.write_all(&message.payload).await?; drop(stdin); let result = proc.wait().await?; Ok(result) } #[tokio::main(flavor = "current_thread")] async fn main() -> anyhow::Result<()> { let mut conf_path: PathBuf = option_env!("SYSCONFDIR").unwrap_or("/usr/local/etc").into(); conf_path.push(format!("{PROGRAM}.toml")); let conf = config::load(&conf_path) .with_context(|| format!("Failed to load config: {:?}", &conf_path))?; stderrlog::new() .color(stderrlog::ColorChoice::Never) .module(module_path!()) .verbosity(conf.log.level) .timestamp(if conf.log.timestamps { stderrlog::Timestamp::Millisecond } else { stderrlog::Timestamp::Off }) .init() .unwrap(); // TODO: This will print creds trace!("Configuration: {conf:?}"); let (client, mut event_loop) = conf.mqtt_client(); for (topic, route) in conf.routes.iter() { if let Err(e) = client.subscribe(topic, route.qos.unwrap_or(conf.qos)).await { warn!("Failed to subscribe to '{topic}': {e:?}"); } else { debug!("Subscribed to: '{topic}'"); } } moro_local::async_scope!(|scope| -> anyhow::Result<()> { loop { let notification = event_loop.poll().await; if let Incoming(Packet::Publish(p)) = notification? { debug!("Received message: {p:?}"); let p = Rc::new(p); for (topic, route) in conf.routes.iter() { if !mqtt::topic_match(topic, &p.topic) { continue; } debug!("Message {p:?} matched topic {topic}"); for program in route.programs.iter() { let p = p.clone(); scope.spawn(async move { match timeout( program.timeout.unwrap_or(conf.timeout), run(&program.command, &p), ) .await { Err(_) => error!( "Execution of {program:?} for message {p:?} timed out" ), Ok(Err(e)) => error!("error: Failed to run {program:?}: {e:?}"), Ok(Ok(c)) => { if !c.success() { if let Some(code) = c.code() { if code != 0 { warn!("Program exited with non-zero exit code: {code}") } else { debug!("Program exited successfully."); } } else if let Some(signal) = c.signal() { let core_dumped = if c.core_dumped() { " (core dumped)" } else { "" }; warn!("Program received signal: {signal}{core_dumped}"); } } }, } }); } } } } }) .await }