aboutsummaryrefslogtreecommitdiffstats
path: root/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs49
1 files changed, 42 insertions, 7 deletions
diff --git a/src/main.rs b/src/main.rs
index 396c66a..b071620 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,15 +1,15 @@
// SPDX-FileCopyrightText: 2025 Tomasz Kramkowski <tomasz@kramkow.ski>
// SPDX-License-Identifier: GPL-3.0-or-later
-// TODO: Log levels
-
use std::{
+ os::unix::process::ExitStatusExt,
path::PathBuf,
process::{ExitStatus, Stdio},
rc::Rc,
};
use anyhow::Context;
+use log::{debug, error, trace, warn};
use rumqttc::{Event::Incoming, Packet, Publish, QoS};
use tokio::{io::AsyncWriteExt, process::Command, time::timeout};
@@ -18,6 +18,7 @@ mod config;
const PROGRAM: &str = "mqttr";
async fn run(program: &[String], message: &Publish) -> anyhow::Result<ExitStatus> {
+ debug!("Starting program {program:?} for message {message:?}");
let mut command = Command::new(&program[0]);
command
.args(&program[1..])
@@ -29,6 +30,10 @@ async fn run(program: &[String], message: &Publish) -> anyhow::Result<ExitStatus
command.arg(format!("{}", message.pkid));
}
let mut proc = command.stdin(Stdio::piped()).spawn()?;
+ trace!(
+ "Started program {program:?} with PID {}",
+ proc.id().expect("missing PID")
+ );
let mut stdin = proc.stdin.take().context("No stdin")?;
stdin.write_all(&message.payload).await?;
drop(stdin);
@@ -76,21 +81,38 @@ async fn main() -> anyhow::Result<()> {
conf_path.push(format!("{PROGRAM}.toml"));
let conf = config::load(&conf_path)
.with_context(|| format!("Failed to load config: {:?}", &conf_path))?;
+ stderrlog::new()
+ .color(stderrlog::ColorChoice::Never)
+ .module(module_path!())
+ .verbosity(conf.log.level)
+ .timestamp(if conf.log.timestamps {
+ stderrlog::Timestamp::Millisecond
+ } else {
+ stderrlog::Timestamp::Off
+ })
+ .init()
+ .unwrap();
+ // TODO: This will print creds
+ trace!("Configuration: {conf:?}");
let (client, mut event_loop) = conf.mqtt_client();
for (topic, route) in conf.routes.iter() {
if let Err(e) = client.subscribe(topic, route.qos.unwrap_or(conf.qos)).await {
- eprintln!("warning: Failed to subscribe to '{topic}': {e:?}");
+ warn!("Failed to subscribe to '{topic}': {e:?}");
+ } else {
+ debug!("Subscribed to: '{topic}'");
}
}
moro_local::async_scope!(|scope| -> anyhow::Result<()> {
loop {
let notification = event_loop.poll().await;
if let Incoming(Packet::Publish(p)) = notification? {
+ debug!("Received message: {p:?}");
let p = Rc::new(p);
for (topic, route) in conf.routes.iter() {
if !topic_match(topic, &p.topic) {
continue;
}
+ debug!("Message {p:?} matched topic {topic}");
for program in route.programs.iter() {
let p = p.clone();
scope.spawn(async move {
@@ -100,11 +122,24 @@ async fn main() -> anyhow::Result<()> {
)
.await
{
- Err(_) => eprintln!(
- "error: Execution of {program:?} for message {p:?} timed out"
+ Err(_) => error!(
+ "Execution of {program:?} for message {p:?} timed out"
),
- Ok(Err(e)) => eprintln!("error: Failed to run {program:?}: {e:?}"),
- _ => (),
+ Ok(Err(e)) => error!("error: Failed to run {program:?}: {e:?}"),
+ Ok(Ok(c)) => {
+ if !c.success() {
+ if let Some(code) = c.code() {
+ if code != 0 {
+ warn!("Program exited with non-zero exit code: {code}")
+ } else {
+ debug!("Program exited successfully.");
+ }
+ } else if let Some(signal) = c.signal() {
+ let core_dumped = if c.core_dumped() { " (core dumped)" } else { "" };
+ warn!("Program received signal: {signal}{core_dumped}");
+ }
+ }
+ },
}
});
}