From 238bd1f811e8e1285b12fb11554a5ab7890452d4 Mon Sep 17 00:00:00 2001 From: Tomasz Kramkowski Date: Wed, 9 Jul 2025 21:08:09 +0100 Subject: Use moro local to reduce allocations This means that we can also use Rc instead of Arc (which should have been used instead of re-allocating in the first place). --- Cargo.lock | 33 +++++++++++++++++++++++++++++++++ Cargo.toml | 1 + src/main.rs | 56 +++++++++++++++++++++++++++++--------------------------- 3 files changed, 63 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 27c692b..55f5214 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,6 +23,17 @@ version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" +[[package]] +name = "async-trait" +version = "0.1.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.4.0" @@ -104,6 +115,15 @@ dependencies = [ "spin", ] +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", +] + [[package]] name = "futures-core" version = "0.3.31" @@ -216,11 +236,24 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "moro-local" +version = "0.4.0" +source = "git+https://github.com/EliteTK/moro-local.git?branch=dependency-reduction#2a01279a8a6e3fdd30c2b980551c623c8670e550" +dependencies = [ + "async-trait", + "futures-channel", + "futures-core", + "futures-util", + "pin-project-lite", +] + [[package]] name = "mqttr" version = "0.2.0" dependencies = [ "anyhow", + "moro-local", "rumqttc", "serde", "tokio", diff --git a/Cargo.toml b/Cargo.toml index b56366e..a5db89e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] anyhow = "1.0.98" +moro-local = { git = "https://github.com/EliteTK/moro-local.git", branch = "dependency-reduction" } rumqttc = "0.24.0" serde = { version = "1.0.219", features = ["derive"] } tokio = { version = "1.45.1", features = ["rt", "macros", "process", "time"] } diff --git a/src/main.rs b/src/main.rs index 68f62dc..072f5eb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,7 @@ // TODO: Log levels -use std::{path::PathBuf, process::Stdio}; +use std::{path::PathBuf, process::Stdio, rc::Rc}; use anyhow::Context; use rumqttc::{Event::Incoming, Packet, Publish, QoS}; @@ -78,35 +78,37 @@ async fn main() -> anyhow::Result<()> { eprintln!("warning: Failed to subscribe to '{topic}': {e:?}"); } } - loop { - let notification = event_loop.poll().await; - if let Incoming(Packet::Publish(p)) = notification? { - for (topic, route) in conf.routes.iter() { - if !topic_match(topic, &p.topic) { - continue; - } - for program in &route.programs { - // TODO: Switch to moro_local to avoid this ewwyness - let program = program.clone(); - let p = p.clone(); - tokio::spawn(async move { - match timeout( - program.timeout.unwrap_or(conf.timeout), - run(&program.command, &p), - ) - .await - { - Err(_) => eprintln!( - "error: Execution of {program:?} for message {p:?} timed out" - ), - Ok(Err(e)) => eprintln!("error: Failed to run {program:?}: {e:?}"), - _ => (), - } - }); + moro_local::async_scope!(|scope| -> anyhow::Result<()> { + loop { + let notification = event_loop.poll().await; + if let Incoming(Packet::Publish(p)) = notification? { + let p = Rc::new(p); + for (topic, route) in conf.routes.iter() { + if !topic_match(topic, &p.topic) { + continue; + } + for program in route.programs.iter() { + let p = p.clone(); + scope.spawn(async move { + match timeout( + program.timeout.unwrap_or(conf.timeout), + run(&program.command, &p), + ) + .await + { + Err(_) => eprintln!( + "error: Execution of {program:?} for message {p:?} timed out" + ), + Ok(Err(e)) => eprintln!("error: Failed to run {program:?}: {e:?}"), + _ => (), + } + }); + } } } } - } + }) + .await } #[cfg(test)] -- cgit v1.2.3-70-g09d2