diff options
| author | Tomasz Kramkowski <tomasz@kramkow.ski> | 2025-07-09 21:08:09 +0100 | 
|---|---|---|
| committer | Tomasz Kramkowski <tomasz@kramkow.ski> | 2025-07-09 21:08:09 +0100 | 
| commit | 238bd1f811e8e1285b12fb11554a5ab7890452d4 (patch) | |
| tree | 5f101d854ec8408f040b0c0f721023fc7ba27ee2 | |
| parent | df7ebe827895e46caf15bc5739bcb6fba5d706f5 (diff) | |
| download | mqttr-238bd1f811e8e1285b12fb11554a5ab7890452d4.tar.gz mqttr-238bd1f811e8e1285b12fb11554a5ab7890452d4.tar.xz mqttr-238bd1f811e8e1285b12fb11554a5ab7890452d4.zip  | |
Use moro local to reduce allocations
This means that we can also use Rc instead of Arc (which should have
been used instead of re-allocating in the first place).
| -rw-r--r-- | Cargo.lock | 33 | ||||
| -rw-r--r-- | Cargo.toml | 1 | ||||
| -rw-r--r-- | src/main.rs | 56 | 
3 files changed, 63 insertions, 27 deletions
@@ -24,6 +24,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"  checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"  [[package]] +name = "async-trait" +version = "0.1.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]]  name = "autocfg"  version = "1.4.0"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -105,6 +116,15 @@ dependencies = [  ]  [[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", +] + +[[package]]  name = "futures-core"  version = "0.3.31"  source = "registry+https://github.com/rust-lang/crates.io-index" @@ -217,10 +237,23 @@ dependencies = [  ]  [[package]] +name = "moro-local" +version = "0.4.0" +source = "git+https://github.com/EliteTK/moro-local.git?branch=dependency-reduction#2a01279a8a6e3fdd30c2b980551c623c8670e550" +dependencies = [ + "async-trait", + "futures-channel", + "futures-core", + "futures-util", + "pin-project-lite", +] + +[[package]]  name = "mqttr"  version = "0.2.0"  dependencies = [   "anyhow", + "moro-local",   "rumqttc",   "serde",   "tokio", @@ -8,6 +8,7 @@ edition = "2021"  [dependencies]  anyhow = "1.0.98" +moro-local = { git = "https://github.com/EliteTK/moro-local.git", branch = "dependency-reduction" }  rumqttc = "0.24.0"  serde = { version = "1.0.219", features = ["derive"] }  tokio = { version = "1.45.1", features = ["rt", "macros", "process", "time"] } diff --git a/src/main.rs b/src/main.rs index 68f62dc..072f5eb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,7 @@  // TODO: Log levels -use std::{path::PathBuf, process::Stdio}; +use std::{path::PathBuf, process::Stdio, rc::Rc};  use anyhow::Context;  use rumqttc::{Event::Incoming, Packet, Publish, QoS}; @@ -78,35 +78,37 @@ async fn main() -> anyhow::Result<()> {              eprintln!("warning: Failed to subscribe to '{topic}': {e:?}");          }      } -    loop { -        let notification = event_loop.poll().await; -        if let Incoming(Packet::Publish(p)) = notification? { -            for (topic, route) in conf.routes.iter() { -                if !topic_match(topic, &p.topic) { -                    continue; -                } -                for program in &route.programs { -                    // TODO: Switch to moro_local to avoid this ewwyness -                    let program = program.clone(); -                    let p = p.clone(); -                    tokio::spawn(async move { -                        match timeout( -                            program.timeout.unwrap_or(conf.timeout), -                            run(&program.command, &p), -                        ) -                        .await -                        { -                            Err(_) => eprintln!( -                                "error: Execution of {program:?} for message {p:?} timed out" -                            ), -                            Ok(Err(e)) => eprintln!("error: Failed to run {program:?}: {e:?}"), -                            _ => (), -                        } -                    }); +    moro_local::async_scope!(|scope| -> anyhow::Result<()> { +        loop { +            let notification = event_loop.poll().await; +            if let Incoming(Packet::Publish(p)) = notification? { +                let p = Rc::new(p); +                for (topic, route) in conf.routes.iter() { +                    if !topic_match(topic, &p.topic) { +                        continue; +                    } +                    for program in route.programs.iter() { +                        let p = p.clone(); +                        scope.spawn(async move { +                            match timeout( +                                program.timeout.unwrap_or(conf.timeout), +                                run(&program.command, &p), +                            ) +                            .await +                            { +                                Err(_) => eprintln!( +                                    "error: Execution of {program:?} for message {p:?} timed out" +                                ), +                                Ok(Err(e)) => eprintln!("error: Failed to run {program:?}: {e:?}"), +                                _ => (), +                            } +                        }); +                    }                  }              }          } -    } +    }) +    .await  }  #[cfg(test)]  | 
