diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/config.rs | 32 | ||||
-rw-r--r-- | src/main.rs | 49 |
2 files changed, 74 insertions, 7 deletions
diff --git a/src/config.rs b/src/config.rs index 2ba10ec..c110825 100644 --- a/src/config.rs +++ b/src/config.rs @@ -7,6 +7,7 @@ use std::{ }; use anyhow::bail; +use log::LevelFilter; use rumqttc::{AsyncClient, EventLoop, MqttOptions, QoS}; use serde::{ de::{self, Visitor}, @@ -15,6 +16,23 @@ use serde::{ use crate::PROGRAM; +#[derive(Deserialize, Debug, PartialEq)] +pub struct Logging { + #[serde(default = "default_level_filter")] + pub level: LevelFilter, + #[serde(default)] // Off + pub timestamps: bool, +} + +impl Default for Logging { + fn default() -> Self { + Self { + level: default_level_filter(), + timestamps: bool::default(), + } + } +} + #[derive(Deserialize, Debug)] pub struct Credentials { pub username: String, @@ -41,6 +59,10 @@ fn default_timeout() -> Duration { Duration::from_secs(60) } +fn default_level_filter() -> LevelFilter { + LevelFilter::Info +} + #[allow(clippy::enum_variant_names)] #[derive(Deserialize, Debug)] #[serde(remote = "QoS", rename_all = "kebab-case")] @@ -248,6 +270,8 @@ pub struct Config { pub qos: QoS, #[serde(default = "default_timeout", deserialize_with = "deserialize_timeout")] pub timeout: Duration, + #[serde(default)] + pub log: Logging, pub credentials: Option<Credentials>, #[serde(default = "default_id")] pub id: String, @@ -317,6 +341,10 @@ mod tests { username = "testuser" password = "testpassword" + [log] + level = "trace" + timestamps = true + [routes] "topic/map" = { programs = [ ["/bin/program1"], @@ -341,6 +369,9 @@ mod tests { assert_eq!(creds.username, "testuser"); assert_eq!(creds.password, "testpassword"); + assert_eq!(config.log.level, LevelFilter::Trace); + assert_eq!(config.log.timestamps, true); + assert_eq!(config.routes.len(), 2); let route_map = config.routes.get("topic/map").unwrap(); @@ -377,6 +408,7 @@ mod tests { assert_eq!(config.id, default_id()); assert_eq!(config.timeout, default_timeout()); assert!(config.credentials.is_none()); + assert_eq!(config.log, Logging::default()); assert!(config.routes.is_empty()); } diff --git a/src/main.rs b/src/main.rs index 396c66a..b071620 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,15 @@ // SPDX-FileCopyrightText: 2025 Tomasz Kramkowski <tomasz@kramkow.ski> // SPDX-License-Identifier: GPL-3.0-or-later -// TODO: Log levels - use std::{ + os::unix::process::ExitStatusExt, path::PathBuf, process::{ExitStatus, Stdio}, rc::Rc, }; use anyhow::Context; +use log::{debug, error, trace, warn}; use rumqttc::{Event::Incoming, Packet, Publish, QoS}; use tokio::{io::AsyncWriteExt, process::Command, time::timeout}; @@ -18,6 +18,7 @@ mod config; const PROGRAM: &str = "mqttr"; async fn run(program: &[String], message: &Publish) -> anyhow::Result<ExitStatus> { + debug!("Starting program {program:?} for message {message:?}"); let mut command = Command::new(&program[0]); command .args(&program[1..]) @@ -29,6 +30,10 @@ async fn run(program: &[String], message: &Publish) -> anyhow::Result<ExitStatus command.arg(format!("{}", message.pkid)); } let mut proc = command.stdin(Stdio::piped()).spawn()?; + trace!( + "Started program {program:?} with PID {}", + proc.id().expect("missing PID") + ); let mut stdin = proc.stdin.take().context("No stdin")?; stdin.write_all(&message.payload).await?; drop(stdin); @@ -76,21 +81,38 @@ async fn main() -> anyhow::Result<()> { conf_path.push(format!("{PROGRAM}.toml")); let conf = config::load(&conf_path) .with_context(|| format!("Failed to load config: {:?}", &conf_path))?; + stderrlog::new() + .color(stderrlog::ColorChoice::Never) + .module(module_path!()) + .verbosity(conf.log.level) + .timestamp(if conf.log.timestamps { + stderrlog::Timestamp::Millisecond + } else { + stderrlog::Timestamp::Off + }) + .init() + .unwrap(); + // TODO: This will print creds + trace!("Configuration: {conf:?}"); let (client, mut event_loop) = conf.mqtt_client(); for (topic, route) in conf.routes.iter() { if let Err(e) = client.subscribe(topic, route.qos.unwrap_or(conf.qos)).await { - eprintln!("warning: Failed to subscribe to '{topic}': {e:?}"); + warn!("Failed to subscribe to '{topic}': {e:?}"); + } else { + debug!("Subscribed to: '{topic}'"); } } moro_local::async_scope!(|scope| -> anyhow::Result<()> { loop { let notification = event_loop.poll().await; if let Incoming(Packet::Publish(p)) = notification? { + debug!("Received message: {p:?}"); let p = Rc::new(p); for (topic, route) in conf.routes.iter() { if !topic_match(topic, &p.topic) { continue; } + debug!("Message {p:?} matched topic {topic}"); for program in route.programs.iter() { let p = p.clone(); scope.spawn(async move { @@ -100,11 +122,24 @@ async fn main() -> anyhow::Result<()> { ) .await { - Err(_) => eprintln!( - "error: Execution of {program:?} for message {p:?} timed out" + Err(_) => error!( + "Execution of {program:?} for message {p:?} timed out" ), - Ok(Err(e)) => eprintln!("error: Failed to run {program:?}: {e:?}"), - _ => (), + Ok(Err(e)) => error!("error: Failed to run {program:?}: {e:?}"), + Ok(Ok(c)) => { + if !c.success() { + if let Some(code) = c.code() { + if code != 0 { + warn!("Program exited with non-zero exit code: {code}") + } else { + debug!("Program exited successfully."); + } + } else if let Some(signal) = c.signal() { + let core_dumped = if c.core_dumped() { " (core dumped)" } else { "" }; + warn!("Program received signal: {signal}{core_dumped}"); + } + } + }, } }); } |