aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorTomasz Kramkowski <tomasz@kramkow.ski>2025-07-10 00:20:08 +0100
committerTomasz Kramkowski <tomasz@kramkow.ski>2025-07-10 00:20:08 +0100
commit03e50ca399b44a26fefafe325ea40fa453b7d6ea (patch)
tree7efefb4eafdbff5adfb42b5013d2ea9fa6f06f31 /src
parentb37432b1f435f88e18eb7779fecfc33a6260cef3 (diff)
downloadmqttr-03e50ca399b44a26fefafe325ea40fa453b7d6ea.tar.gz
mqttr-03e50ca399b44a26fefafe325ea40fa453b7d6ea.tar.xz
mqttr-03e50ca399b44a26fefafe325ea40fa453b7d6ea.zip
Configurable logging
Would be good to maybe log more things, and maybe play around with log levels. But this is okay for now. stderrlog probably will go at some point, too many unnecessary transient dependencies
Diffstat (limited to 'src')
-rw-r--r--src/config.rs32
-rw-r--r--src/main.rs49
2 files changed, 74 insertions, 7 deletions
diff --git a/src/config.rs b/src/config.rs
index 2ba10ec..c110825 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -7,6 +7,7 @@ use std::{
};
use anyhow::bail;
+use log::LevelFilter;
use rumqttc::{AsyncClient, EventLoop, MqttOptions, QoS};
use serde::{
de::{self, Visitor},
@@ -15,6 +16,23 @@ use serde::{
use crate::PROGRAM;
+#[derive(Deserialize, Debug, PartialEq)]
+pub struct Logging {
+ #[serde(default = "default_level_filter")]
+ pub level: LevelFilter,
+ #[serde(default)] // Off
+ pub timestamps: bool,
+}
+
+impl Default for Logging {
+ fn default() -> Self {
+ Self {
+ level: default_level_filter(),
+ timestamps: bool::default(),
+ }
+ }
+}
+
#[derive(Deserialize, Debug)]
pub struct Credentials {
pub username: String,
@@ -41,6 +59,10 @@ fn default_timeout() -> Duration {
Duration::from_secs(60)
}
+fn default_level_filter() -> LevelFilter {
+ LevelFilter::Info
+}
+
#[allow(clippy::enum_variant_names)]
#[derive(Deserialize, Debug)]
#[serde(remote = "QoS", rename_all = "kebab-case")]
@@ -248,6 +270,8 @@ pub struct Config {
pub qos: QoS,
#[serde(default = "default_timeout", deserialize_with = "deserialize_timeout")]
pub timeout: Duration,
+ #[serde(default)]
+ pub log: Logging,
pub credentials: Option<Credentials>,
#[serde(default = "default_id")]
pub id: String,
@@ -317,6 +341,10 @@ mod tests {
username = "testuser"
password = "testpassword"
+ [log]
+ level = "trace"
+ timestamps = true
+
[routes]
"topic/map" = { programs = [
["/bin/program1"],
@@ -341,6 +369,9 @@ mod tests {
assert_eq!(creds.username, "testuser");
assert_eq!(creds.password, "testpassword");
+ assert_eq!(config.log.level, LevelFilter::Trace);
+ assert_eq!(config.log.timestamps, true);
+
assert_eq!(config.routes.len(), 2);
let route_map = config.routes.get("topic/map").unwrap();
@@ -377,6 +408,7 @@ mod tests {
assert_eq!(config.id, default_id());
assert_eq!(config.timeout, default_timeout());
assert!(config.credentials.is_none());
+ assert_eq!(config.log, Logging::default());
assert!(config.routes.is_empty());
}
diff --git a/src/main.rs b/src/main.rs
index 396c66a..b071620 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,15 +1,15 @@
// SPDX-FileCopyrightText: 2025 Tomasz Kramkowski <tomasz@kramkow.ski>
// SPDX-License-Identifier: GPL-3.0-or-later
-// TODO: Log levels
-
use std::{
+ os::unix::process::ExitStatusExt,
path::PathBuf,
process::{ExitStatus, Stdio},
rc::Rc,
};
use anyhow::Context;
+use log::{debug, error, trace, warn};
use rumqttc::{Event::Incoming, Packet, Publish, QoS};
use tokio::{io::AsyncWriteExt, process::Command, time::timeout};
@@ -18,6 +18,7 @@ mod config;
const PROGRAM: &str = "mqttr";
async fn run(program: &[String], message: &Publish) -> anyhow::Result<ExitStatus> {
+ debug!("Starting program {program:?} for message {message:?}");
let mut command = Command::new(&program[0]);
command
.args(&program[1..])
@@ -29,6 +30,10 @@ async fn run(program: &[String], message: &Publish) -> anyhow::Result<ExitStatus
command.arg(format!("{}", message.pkid));
}
let mut proc = command.stdin(Stdio::piped()).spawn()?;
+ trace!(
+ "Started program {program:?} with PID {}",
+ proc.id().expect("missing PID")
+ );
let mut stdin = proc.stdin.take().context("No stdin")?;
stdin.write_all(&message.payload).await?;
drop(stdin);
@@ -76,21 +81,38 @@ async fn main() -> anyhow::Result<()> {
conf_path.push(format!("{PROGRAM}.toml"));
let conf = config::load(&conf_path)
.with_context(|| format!("Failed to load config: {:?}", &conf_path))?;
+ stderrlog::new()
+ .color(stderrlog::ColorChoice::Never)
+ .module(module_path!())
+ .verbosity(conf.log.level)
+ .timestamp(if conf.log.timestamps {
+ stderrlog::Timestamp::Millisecond
+ } else {
+ stderrlog::Timestamp::Off
+ })
+ .init()
+ .unwrap();
+ // TODO: This will print creds
+ trace!("Configuration: {conf:?}");
let (client, mut event_loop) = conf.mqtt_client();
for (topic, route) in conf.routes.iter() {
if let Err(e) = client.subscribe(topic, route.qos.unwrap_or(conf.qos)).await {
- eprintln!("warning: Failed to subscribe to '{topic}': {e:?}");
+ warn!("Failed to subscribe to '{topic}': {e:?}");
+ } else {
+ debug!("Subscribed to: '{topic}'");
}
}
moro_local::async_scope!(|scope| -> anyhow::Result<()> {
loop {
let notification = event_loop.poll().await;
if let Incoming(Packet::Publish(p)) = notification? {
+ debug!("Received message: {p:?}");
let p = Rc::new(p);
for (topic, route) in conf.routes.iter() {
if !topic_match(topic, &p.topic) {
continue;
}
+ debug!("Message {p:?} matched topic {topic}");
for program in route.programs.iter() {
let p = p.clone();
scope.spawn(async move {
@@ -100,11 +122,24 @@ async fn main() -> anyhow::Result<()> {
)
.await
{
- Err(_) => eprintln!(
- "error: Execution of {program:?} for message {p:?} timed out"
+ Err(_) => error!(
+ "Execution of {program:?} for message {p:?} timed out"
),
- Ok(Err(e)) => eprintln!("error: Failed to run {program:?}: {e:?}"),
- _ => (),
+ Ok(Err(e)) => error!("error: Failed to run {program:?}: {e:?}"),
+ Ok(Ok(c)) => {
+ if !c.success() {
+ if let Some(code) = c.code() {
+ if code != 0 {
+ warn!("Program exited with non-zero exit code: {code}")
+ } else {
+ debug!("Program exited successfully.");
+ }
+ } else if let Some(signal) = c.signal() {
+ let core_dumped = if c.core_dumped() { " (core dumped)" } else { "" };
+ warn!("Program received signal: {signal}{core_dumped}");
+ }
+ }
+ },
}
});
}