aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/config.rs32
-rw-r--r--src/main.rs49
2 files changed, 74 insertions, 7 deletions
diff --git a/src/config.rs b/src/config.rs
index 2ba10ec..c110825 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -7,6 +7,7 @@ use std::{
};
use anyhow::bail;
+use log::LevelFilter;
use rumqttc::{AsyncClient, EventLoop, MqttOptions, QoS};
use serde::{
de::{self, Visitor},
@@ -15,6 +16,23 @@ use serde::{
use crate::PROGRAM;
+#[derive(Deserialize, Debug, PartialEq)]
+pub struct Logging {
+ #[serde(default = "default_level_filter")]
+ pub level: LevelFilter,
+ #[serde(default)] // Off
+ pub timestamps: bool,
+}
+
+impl Default for Logging {
+ fn default() -> Self {
+ Self {
+ level: default_level_filter(),
+ timestamps: bool::default(),
+ }
+ }
+}
+
#[derive(Deserialize, Debug)]
pub struct Credentials {
pub username: String,
@@ -41,6 +59,10 @@ fn default_timeout() -> Duration {
Duration::from_secs(60)
}
+fn default_level_filter() -> LevelFilter {
+ LevelFilter::Info
+}
+
#[allow(clippy::enum_variant_names)]
#[derive(Deserialize, Debug)]
#[serde(remote = "QoS", rename_all = "kebab-case")]
@@ -248,6 +270,8 @@ pub struct Config {
pub qos: QoS,
#[serde(default = "default_timeout", deserialize_with = "deserialize_timeout")]
pub timeout: Duration,
+ #[serde(default)]
+ pub log: Logging,
pub credentials: Option<Credentials>,
#[serde(default = "default_id")]
pub id: String,
@@ -317,6 +341,10 @@ mod tests {
username = "testuser"
password = "testpassword"
+ [log]
+ level = "trace"
+ timestamps = true
+
[routes]
"topic/map" = { programs = [
["/bin/program1"],
@@ -341,6 +369,9 @@ mod tests {
assert_eq!(creds.username, "testuser");
assert_eq!(creds.password, "testpassword");
+ assert_eq!(config.log.level, LevelFilter::Trace);
+ assert_eq!(config.log.timestamps, true);
+
assert_eq!(config.routes.len(), 2);
let route_map = config.routes.get("topic/map").unwrap();
@@ -377,6 +408,7 @@ mod tests {
assert_eq!(config.id, default_id());
assert_eq!(config.timeout, default_timeout());
assert!(config.credentials.is_none());
+ assert_eq!(config.log, Logging::default());
assert!(config.routes.is_empty());
}
diff --git a/src/main.rs b/src/main.rs
index 396c66a..b071620 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,15 +1,15 @@
// SPDX-FileCopyrightText: 2025 Tomasz Kramkowski <tomasz@kramkow.ski>
// SPDX-License-Identifier: GPL-3.0-or-later
-// TODO: Log levels
-
use std::{
+ os::unix::process::ExitStatusExt,
path::PathBuf,
process::{ExitStatus, Stdio},
rc::Rc,
};
use anyhow::Context;
+use log::{debug, error, trace, warn};
use rumqttc::{Event::Incoming, Packet, Publish, QoS};
use tokio::{io::AsyncWriteExt, process::Command, time::timeout};
@@ -18,6 +18,7 @@ mod config;
const PROGRAM: &str = "mqttr";
async fn run(program: &[String], message: &Publish) -> anyhow::Result<ExitStatus> {
+ debug!("Starting program {program:?} for message {message:?}");
let mut command = Command::new(&program[0]);
command
.args(&program[1..])
@@ -29,6 +30,10 @@ async fn run(program: &[String], message: &Publish) -> anyhow::Result<ExitStatus
command.arg(format!("{}", message.pkid));
}
let mut proc = command.stdin(Stdio::piped()).spawn()?;
+ trace!(
+ "Started program {program:?} with PID {}",
+ proc.id().expect("missing PID")
+ );
let mut stdin = proc.stdin.take().context("No stdin")?;
stdin.write_all(&message.payload).await?;
drop(stdin);
@@ -76,21 +81,38 @@ async fn main() -> anyhow::Result<()> {
conf_path.push(format!("{PROGRAM}.toml"));
let conf = config::load(&conf_path)
.with_context(|| format!("Failed to load config: {:?}", &conf_path))?;
+ stderrlog::new()
+ .color(stderrlog::ColorChoice::Never)
+ .module(module_path!())
+ .verbosity(conf.log.level)
+ .timestamp(if conf.log.timestamps {
+ stderrlog::Timestamp::Millisecond
+ } else {
+ stderrlog::Timestamp::Off
+ })
+ .init()
+ .unwrap();
+ // TODO: This will print creds
+ trace!("Configuration: {conf:?}");
let (client, mut event_loop) = conf.mqtt_client();
for (topic, route) in conf.routes.iter() {
if let Err(e) = client.subscribe(topic, route.qos.unwrap_or(conf.qos)).await {
- eprintln!("warning: Failed to subscribe to '{topic}': {e:?}");
+ warn!("Failed to subscribe to '{topic}': {e:?}");
+ } else {
+ debug!("Subscribed to: '{topic}'");
}
}
moro_local::async_scope!(|scope| -> anyhow::Result<()> {
loop {
let notification = event_loop.poll().await;
if let Incoming(Packet::Publish(p)) = notification? {
+ debug!("Received message: {p:?}");
let p = Rc::new(p);
for (topic, route) in conf.routes.iter() {
if !topic_match(topic, &p.topic) {
continue;
}
+ debug!("Message {p:?} matched topic {topic}");
for program in route.programs.iter() {
let p = p.clone();
scope.spawn(async move {
@@ -100,11 +122,24 @@ async fn main() -> anyhow::Result<()> {
)
.await
{
- Err(_) => eprintln!(
- "error: Execution of {program:?} for message {p:?} timed out"
+ Err(_) => error!(
+ "Execution of {program:?} for message {p:?} timed out"
),
- Ok(Err(e)) => eprintln!("error: Failed to run {program:?}: {e:?}"),
- _ => (),
+ Ok(Err(e)) => error!("error: Failed to run {program:?}: {e:?}"),
+ Ok(Ok(c)) => {
+ if !c.success() {
+ if let Some(code) = c.code() {
+ if code != 0 {
+ warn!("Program exited with non-zero exit code: {code}")
+ } else {
+ debug!("Program exited successfully.");
+ }
+ } else if let Some(signal) = c.signal() {
+ let core_dumped = if c.core_dumped() { " (core dumped)" } else { "" };
+ warn!("Program received signal: {signal}{core_dumped}");
+ }
+ }
+ },
}
});
}