diff options
Diffstat (limited to 'src/main.rs')
-rw-r--r-- | src/main.rs | 49 |
1 files changed, 42 insertions, 7 deletions
diff --git a/src/main.rs b/src/main.rs index 396c66a..b071620 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,15 @@ // SPDX-FileCopyrightText: 2025 Tomasz Kramkowski <tomasz@kramkow.ski> // SPDX-License-Identifier: GPL-3.0-or-later -// TODO: Log levels - use std::{ + os::unix::process::ExitStatusExt, path::PathBuf, process::{ExitStatus, Stdio}, rc::Rc, }; use anyhow::Context; +use log::{debug, error, trace, warn}; use rumqttc::{Event::Incoming, Packet, Publish, QoS}; use tokio::{io::AsyncWriteExt, process::Command, time::timeout}; @@ -18,6 +18,7 @@ mod config; const PROGRAM: &str = "mqttr"; async fn run(program: &[String], message: &Publish) -> anyhow::Result<ExitStatus> { + debug!("Starting program {program:?} for message {message:?}"); let mut command = Command::new(&program[0]); command .args(&program[1..]) @@ -29,6 +30,10 @@ async fn run(program: &[String], message: &Publish) -> anyhow::Result<ExitStatus command.arg(format!("{}", message.pkid)); } let mut proc = command.stdin(Stdio::piped()).spawn()?; + trace!( + "Started program {program:?} with PID {}", + proc.id().expect("missing PID") + ); let mut stdin = proc.stdin.take().context("No stdin")?; stdin.write_all(&message.payload).await?; drop(stdin); @@ -76,21 +81,38 @@ async fn main() -> anyhow::Result<()> { conf_path.push(format!("{PROGRAM}.toml")); let conf = config::load(&conf_path) .with_context(|| format!("Failed to load config: {:?}", &conf_path))?; + stderrlog::new() + .color(stderrlog::ColorChoice::Never) + .module(module_path!()) + .verbosity(conf.log.level) + .timestamp(if conf.log.timestamps { + stderrlog::Timestamp::Millisecond + } else { + stderrlog::Timestamp::Off + }) + .init() + .unwrap(); + // TODO: This will print creds + trace!("Configuration: {conf:?}"); let (client, mut event_loop) = conf.mqtt_client(); for (topic, route) in conf.routes.iter() { if let Err(e) = client.subscribe(topic, route.qos.unwrap_or(conf.qos)).await { - eprintln!("warning: Failed to subscribe to '{topic}': {e:?}"); + warn!("Failed to subscribe to '{topic}': {e:?}"); + } else { + debug!("Subscribed to: '{topic}'"); } } moro_local::async_scope!(|scope| -> anyhow::Result<()> { loop { let notification = event_loop.poll().await; if let Incoming(Packet::Publish(p)) = notification? { + debug!("Received message: {p:?}"); let p = Rc::new(p); for (topic, route) in conf.routes.iter() { if !topic_match(topic, &p.topic) { continue; } + debug!("Message {p:?} matched topic {topic}"); for program in route.programs.iter() { let p = p.clone(); scope.spawn(async move { @@ -100,11 +122,24 @@ async fn main() -> anyhow::Result<()> { ) .await { - Err(_) => eprintln!( - "error: Execution of {program:?} for message {p:?} timed out" + Err(_) => error!( + "Execution of {program:?} for message {p:?} timed out" ), - Ok(Err(e)) => eprintln!("error: Failed to run {program:?}: {e:?}"), - _ => (), + Ok(Err(e)) => error!("error: Failed to run {program:?}: {e:?}"), + Ok(Ok(c)) => { + if !c.success() { + if let Some(code) = c.code() { + if code != 0 { + warn!("Program exited with non-zero exit code: {code}") + } else { + debug!("Program exited successfully."); + } + } else if let Some(signal) = c.signal() { + let core_dumped = if c.core_dumped() { " (core dumped)" } else { "" }; + warn!("Program received signal: {signal}{core_dumped}"); + } + } + }, } }); } |