diff options
| author | Tomasz Kramkowski <tomasz@kramkow.ski> | 2025-10-16 19:14:45 +0100 |
|---|---|---|
| committer | Tomasz Kramkowski <tomasz@kramkow.ski> | 2025-10-16 19:14:45 +0100 |
| commit | d8792c3116fe3c2014511e449d23ee9ce66d9e32 (patch) | |
| tree | 8eb6e974a0910bb41a6659a4969df7844dd2e9c1 | |
| parent | 2ba9419409b16d0b1fdb8f8bc8b1b8579177d0d8 (diff) | |
| download | mqttr-d8792c3116fe3c2014511e449d23ee9ce66d9e32.tar.gz mqttr-d8792c3116fe3c2014511e449d23ee9ce66d9e32.tar.xz mqttr-d8792c3116fe3c2014511e449d23ee9ce66d9e32.zip | |
Refactor deeply nested behemoth
Now a separate function
| -rw-r--r-- | src/main.rs | 83 |
1 files changed, 48 insertions, 35 deletions
diff --git a/src/main.rs b/src/main.rs index a6bfbaa..e319955 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,13 +7,17 @@ use std::{ path::PathBuf, process::{ExitStatus, Stdio}, rc::Rc, + time::Duration, }; use anyhow::Context; use log::{debug, error, trace, warn}; +use moro_local::Scope; use rumqttc::{Event::Incoming, Packet, Publish, QoS}; use tokio::{io::AsyncWriteExt, process::Command, time::timeout}; +use crate::config::Program; + mod config; mod mqtt; @@ -43,6 +47,49 @@ async fn run(program: &[OsString], message: &Publish) -> anyhow::Result<ExitStat Ok(result) } +fn run_route_programs<'a, R>( + scope: &'a Scope<'a, '_, R>, + programs: &'a [Program], + message: &Publish, + default_timeout: Duration, +) { + for program in programs.iter() { + let p = message.clone(); + scope.spawn(async move { + // TODO: BUG: This won't guarentee the process gets + // killed. kill_on_drop itself also has problems. + // Need to handle this properly manually. + // TODO: Also should use cancellation tokens. + match timeout( + program.timeout.unwrap_or(default_timeout), + run(&program.command, &p), + ) + .await + { + Err(_) => error!("Execution of {program:?} for message {p:?} timed out"), + Ok(Err(e)) => error!("error: Failed to run {program:?}: {e:?}"), + Ok(Ok(c)) if !c.success() => { + if let Some(code) = c.code() { + if code != 0 { + warn!("Program exited with non-zero exit code: {code}") + } else { + debug!("Program exited successfully."); + } + } else if let Some(signal) = c.signal() { + let core_dumped = if c.core_dumped() { + " (core dumped)" + } else { + "" + }; + warn!("Program received signal: {signal}{core_dumped}"); + } + } + Ok(Ok(_)) => (), + } + }); + } +} + #[tokio::main(flavor = "current_thread")] async fn main() -> anyhow::Result<()> { let mut conf_path: PathBuf = option_env!("SYSCONFDIR").unwrap_or("/usr/local/etc").into(); @@ -81,41 +128,7 @@ async fn main() -> anyhow::Result<()> { continue; } debug!("Message {p:?} matched topic {topic}"); - // TODO: Factor this out into a function - for program in route.programs.iter() { - let p = p.clone(); - scope.spawn(async move { - // TODO: BUG: This won't guarentee the process gets - // killed. kill_on_drop itself also has problems. - // Need to handle this properly manually. - // TODO: Also should use cancellation tokens. - match timeout( - program.timeout.unwrap_or(conf.timeout), - run(&program.command, &p), - ) - .await - { - Err(_) => error!( - "Execution of {program:?} for message {p:?} timed out" - ), - Ok(Err(e)) => error!("error: Failed to run {program:?}: {e:?}"), - Ok(Ok(c)) => { - if !c.success() { - if let Some(code) = c.code() { - if code != 0 { - warn!("Program exited with non-zero exit code: {code}") - } else { - debug!("Program exited successfully."); - } - } else if let Some(signal) = c.signal() { - let core_dumped = if c.core_dumped() { " (core dumped)" } else { "" }; - warn!("Program received signal: {signal}{core_dumped}"); - } - } - }, - } - }); - } + run_route_programs(scope, &route.programs, &p, conf.timeout); } } } |
