aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTomasz Kramkowski <tomasz@kramkow.ski>2025-10-16 19:14:45 +0100
committerTomasz Kramkowski <tomasz@kramkow.ski>2025-10-16 19:14:45 +0100
commitd8792c3116fe3c2014511e449d23ee9ce66d9e32 (patch)
tree8eb6e974a0910bb41a6659a4969df7844dd2e9c1
parent2ba9419409b16d0b1fdb8f8bc8b1b8579177d0d8 (diff)
downloadmqttr-d8792c3116fe3c2014511e449d23ee9ce66d9e32.tar.gz
mqttr-d8792c3116fe3c2014511e449d23ee9ce66d9e32.tar.xz
mqttr-d8792c3116fe3c2014511e449d23ee9ce66d9e32.zip
Refactor deeply nested behemoth
Now a separate function
-rw-r--r--src/main.rs83
1 files changed, 48 insertions, 35 deletions
diff --git a/src/main.rs b/src/main.rs
index a6bfbaa..e319955 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -7,13 +7,17 @@ use std::{
path::PathBuf,
process::{ExitStatus, Stdio},
rc::Rc,
+ time::Duration,
};
use anyhow::Context;
use log::{debug, error, trace, warn};
+use moro_local::Scope;
use rumqttc::{Event::Incoming, Packet, Publish, QoS};
use tokio::{io::AsyncWriteExt, process::Command, time::timeout};
+use crate::config::Program;
+
mod config;
mod mqtt;
@@ -43,6 +47,49 @@ async fn run(program: &[OsString], message: &Publish) -> anyhow::Result<ExitStat
Ok(result)
}
+fn run_route_programs<'a, R>(
+ scope: &'a Scope<'a, '_, R>,
+ programs: &'a [Program],
+ message: &Publish,
+ default_timeout: Duration,
+) {
+ for program in programs.iter() {
+ let p = message.clone();
+ scope.spawn(async move {
+ // TODO: BUG: This won't guarentee the process gets
+ // killed. kill_on_drop itself also has problems.
+ // Need to handle this properly manually.
+ // TODO: Also should use cancellation tokens.
+ match timeout(
+ program.timeout.unwrap_or(default_timeout),
+ run(&program.command, &p),
+ )
+ .await
+ {
+ Err(_) => error!("Execution of {program:?} for message {p:?} timed out"),
+ Ok(Err(e)) => error!("error: Failed to run {program:?}: {e:?}"),
+ Ok(Ok(c)) if !c.success() => {
+ if let Some(code) = c.code() {
+ if code != 0 {
+ warn!("Program exited with non-zero exit code: {code}")
+ } else {
+ debug!("Program exited successfully.");
+ }
+ } else if let Some(signal) = c.signal() {
+ let core_dumped = if c.core_dumped() {
+ " (core dumped)"
+ } else {
+ ""
+ };
+ warn!("Program received signal: {signal}{core_dumped}");
+ }
+ }
+ Ok(Ok(_)) => (),
+ }
+ });
+ }
+}
+
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
let mut conf_path: PathBuf = option_env!("SYSCONFDIR").unwrap_or("/usr/local/etc").into();
@@ -81,41 +128,7 @@ async fn main() -> anyhow::Result<()> {
continue;
}
debug!("Message {p:?} matched topic {topic}");
- // TODO: Factor this out into a function
- for program in route.programs.iter() {
- let p = p.clone();
- scope.spawn(async move {
- // TODO: BUG: This won't guarentee the process gets
- // killed. kill_on_drop itself also has problems.
- // Need to handle this properly manually.
- // TODO: Also should use cancellation tokens.
- match timeout(
- program.timeout.unwrap_or(conf.timeout),
- run(&program.command, &p),
- )
- .await
- {
- Err(_) => error!(
- "Execution of {program:?} for message {p:?} timed out"
- ),
- Ok(Err(e)) => error!("error: Failed to run {program:?}: {e:?}"),
- Ok(Ok(c)) => {
- if !c.success() {
- if let Some(code) = c.code() {
- if code != 0 {
- warn!("Program exited with non-zero exit code: {code}")
- } else {
- debug!("Program exited successfully.");
- }
- } else if let Some(signal) = c.signal() {
- let core_dumped = if c.core_dumped() { " (core dumped)" } else { "" };
- warn!("Program received signal: {signal}{core_dumped}");
- }
- }
- },
- }
- });
- }
+ run_route_programs(scope, &route.programs, &p, conf.timeout);
}
}
}