diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/main.rs | 56 |
1 files changed, 29 insertions, 27 deletions
diff --git a/src/main.rs b/src/main.rs index 68f62dc..072f5eb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,7 @@ // TODO: Log levels -use std::{path::PathBuf, process::Stdio}; +use std::{path::PathBuf, process::Stdio, rc::Rc}; use anyhow::Context; use rumqttc::{Event::Incoming, Packet, Publish, QoS}; @@ -78,35 +78,37 @@ async fn main() -> anyhow::Result<()> { eprintln!("warning: Failed to subscribe to '{topic}': {e:?}"); } } - loop { - let notification = event_loop.poll().await; - if let Incoming(Packet::Publish(p)) = notification? { - for (topic, route) in conf.routes.iter() { - if !topic_match(topic, &p.topic) { - continue; - } - for program in &route.programs { - // TODO: Switch to moro_local to avoid this ewwyness - let program = program.clone(); - let p = p.clone(); - tokio::spawn(async move { - match timeout( - program.timeout.unwrap_or(conf.timeout), - run(&program.command, &p), - ) - .await - { - Err(_) => eprintln!( - "error: Execution of {program:?} for message {p:?} timed out" - ), - Ok(Err(e)) => eprintln!("error: Failed to run {program:?}: {e:?}"), - _ => (), - } - }); + moro_local::async_scope!(|scope| -> anyhow::Result<()> { + loop { + let notification = event_loop.poll().await; + if let Incoming(Packet::Publish(p)) = notification? { + let p = Rc::new(p); + for (topic, route) in conf.routes.iter() { + if !topic_match(topic, &p.topic) { + continue; + } + for program in route.programs.iter() { + let p = p.clone(); + scope.spawn(async move { + match timeout( + program.timeout.unwrap_or(conf.timeout), + run(&program.command, &p), + ) + .await + { + Err(_) => eprintln!( + "error: Execution of {program:?} for message {p:?} timed out" + ), + Ok(Err(e)) => eprintln!("error: Failed to run {program:?}: {e:?}"), + _ => (), + } + }); + } } } } - } + }) + .await } #[cfg(test)] |