aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorTomasz Kramkowski <tomasz@kramkow.ski>2025-07-09 21:08:09 +0100
committerTomasz Kramkowski <tomasz@kramkow.ski>2025-07-09 21:08:09 +0100
commit238bd1f811e8e1285b12fb11554a5ab7890452d4 (patch)
tree5f101d854ec8408f040b0c0f721023fc7ba27ee2 /src
parentdf7ebe827895e46caf15bc5739bcb6fba5d706f5 (diff)
downloadmqttr-238bd1f811e8e1285b12fb11554a5ab7890452d4.tar.gz
mqttr-238bd1f811e8e1285b12fb11554a5ab7890452d4.tar.xz
mqttr-238bd1f811e8e1285b12fb11554a5ab7890452d4.zip
Use moro local to reduce allocations
This means that we can also use Rc instead of Arc (which should have been used instead of re-allocating in the first place).
Diffstat (limited to 'src')
-rw-r--r--src/main.rs56
1 files changed, 29 insertions, 27 deletions
diff --git a/src/main.rs b/src/main.rs
index 68f62dc..072f5eb 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,7 +3,7 @@
// TODO: Log levels
-use std::{path::PathBuf, process::Stdio};
+use std::{path::PathBuf, process::Stdio, rc::Rc};
use anyhow::Context;
use rumqttc::{Event::Incoming, Packet, Publish, QoS};
@@ -78,35 +78,37 @@ async fn main() -> anyhow::Result<()> {
eprintln!("warning: Failed to subscribe to '{topic}': {e:?}");
}
}
- loop {
- let notification = event_loop.poll().await;
- if let Incoming(Packet::Publish(p)) = notification? {
- for (topic, route) in conf.routes.iter() {
- if !topic_match(topic, &p.topic) {
- continue;
- }
- for program in &route.programs {
- // TODO: Switch to moro_local to avoid this ewwyness
- let program = program.clone();
- let p = p.clone();
- tokio::spawn(async move {
- match timeout(
- program.timeout.unwrap_or(conf.timeout),
- run(&program.command, &p),
- )
- .await
- {
- Err(_) => eprintln!(
- "error: Execution of {program:?} for message {p:?} timed out"
- ),
- Ok(Err(e)) => eprintln!("error: Failed to run {program:?}: {e:?}"),
- _ => (),
- }
- });
+ moro_local::async_scope!(|scope| -> anyhow::Result<()> {
+ loop {
+ let notification = event_loop.poll().await;
+ if let Incoming(Packet::Publish(p)) = notification? {
+ let p = Rc::new(p);
+ for (topic, route) in conf.routes.iter() {
+ if !topic_match(topic, &p.topic) {
+ continue;
+ }
+ for program in route.programs.iter() {
+ let p = p.clone();
+ scope.spawn(async move {
+ match timeout(
+ program.timeout.unwrap_or(conf.timeout),
+ run(&program.command, &p),
+ )
+ .await
+ {
+ Err(_) => eprintln!(
+ "error: Execution of {program:?} for message {p:?} timed out"
+ ),
+ Ok(Err(e)) => eprintln!("error: Failed to run {program:?}: {e:?}"),
+ _ => (),
+ }
+ });
+ }
}
}
}
- }
+ })
+ .await
}
#[cfg(test)]