diff options
author | Tomasz Kramkowski <tomasz@kramkow.ski> | 2025-07-09 21:08:09 +0100 |
---|---|---|
committer | Tomasz Kramkowski <tomasz@kramkow.ski> | 2025-07-09 21:08:09 +0100 |
commit | 238bd1f811e8e1285b12fb11554a5ab7890452d4 (patch) | |
tree | 5f101d854ec8408f040b0c0f721023fc7ba27ee2 /src/main.rs | |
parent | df7ebe827895e46caf15bc5739bcb6fba5d706f5 (diff) | |
download | mqttr-238bd1f811e8e1285b12fb11554a5ab7890452d4.tar.gz mqttr-238bd1f811e8e1285b12fb11554a5ab7890452d4.tar.xz mqttr-238bd1f811e8e1285b12fb11554a5ab7890452d4.zip |
Use moro local to reduce allocations
This means that we can also use Rc instead of Arc (which should have
been used instead of re-allocating in the first place).
Diffstat (limited to 'src/main.rs')
-rw-r--r-- | src/main.rs | 56 |
1 files changed, 29 insertions, 27 deletions
diff --git a/src/main.rs b/src/main.rs index 68f62dc..072f5eb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,7 @@ // TODO: Log levels -use std::{path::PathBuf, process::Stdio}; +use std::{path::PathBuf, process::Stdio, rc::Rc}; use anyhow::Context; use rumqttc::{Event::Incoming, Packet, Publish, QoS}; @@ -78,35 +78,37 @@ async fn main() -> anyhow::Result<()> { eprintln!("warning: Failed to subscribe to '{topic}': {e:?}"); } } - loop { - let notification = event_loop.poll().await; - if let Incoming(Packet::Publish(p)) = notification? { - for (topic, route) in conf.routes.iter() { - if !topic_match(topic, &p.topic) { - continue; - } - for program in &route.programs { - // TODO: Switch to moro_local to avoid this ewwyness - let program = program.clone(); - let p = p.clone(); - tokio::spawn(async move { - match timeout( - program.timeout.unwrap_or(conf.timeout), - run(&program.command, &p), - ) - .await - { - Err(_) => eprintln!( - "error: Execution of {program:?} for message {p:?} timed out" - ), - Ok(Err(e)) => eprintln!("error: Failed to run {program:?}: {e:?}"), - _ => (), - } - }); + moro_local::async_scope!(|scope| -> anyhow::Result<()> { + loop { + let notification = event_loop.poll().await; + if let Incoming(Packet::Publish(p)) = notification? { + let p = Rc::new(p); + for (topic, route) in conf.routes.iter() { + if !topic_match(topic, &p.topic) { + continue; + } + for program in route.programs.iter() { + let p = p.clone(); + scope.spawn(async move { + match timeout( + program.timeout.unwrap_or(conf.timeout), + run(&program.command, &p), + ) + .await + { + Err(_) => eprintln!( + "error: Execution of {program:?} for message {p:?} timed out" + ), + Ok(Err(e)) => eprintln!("error: Failed to run {program:?}: {e:?}"), + _ => (), + } + }); + } } } } - } + }) + .await } #[cfg(test)] |