aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock33
-rw-r--r--Cargo.toml1
-rw-r--r--src/main.rs56
3 files changed, 63 insertions, 27 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 27c692b..55f5214 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -24,6 +24,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
[[package]]
+name = "async-trait"
+version = "0.1.88"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
name = "autocfg"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -105,6 +116,15 @@ dependencies = [
]
[[package]]
+name = "futures-channel"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
+dependencies = [
+ "futures-core",
+]
+
+[[package]]
name = "futures-core"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -217,10 +237,23 @@ dependencies = [
]
[[package]]
+name = "moro-local"
+version = "0.4.0"
+source = "git+https://github.com/EliteTK/moro-local.git?branch=dependency-reduction#2a01279a8a6e3fdd30c2b980551c623c8670e550"
+dependencies = [
+ "async-trait",
+ "futures-channel",
+ "futures-core",
+ "futures-util",
+ "pin-project-lite",
+]
+
+[[package]]
name = "mqttr"
version = "0.2.0"
dependencies = [
"anyhow",
+ "moro-local",
"rumqttc",
"serde",
"tokio",
diff --git a/Cargo.toml b/Cargo.toml
index b56366e..a5db89e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -8,6 +8,7 @@ edition = "2021"
[dependencies]
anyhow = "1.0.98"
+moro-local = { git = "https://github.com/EliteTK/moro-local.git", branch = "dependency-reduction" }
rumqttc = "0.24.0"
serde = { version = "1.0.219", features = ["derive"] }
tokio = { version = "1.45.1", features = ["rt", "macros", "process", "time"] }
diff --git a/src/main.rs b/src/main.rs
index 68f62dc..072f5eb 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,7 +3,7 @@
// TODO: Log levels
-use std::{path::PathBuf, process::Stdio};
+use std::{path::PathBuf, process::Stdio, rc::Rc};
use anyhow::Context;
use rumqttc::{Event::Incoming, Packet, Publish, QoS};
@@ -78,35 +78,37 @@ async fn main() -> anyhow::Result<()> {
eprintln!("warning: Failed to subscribe to '{topic}': {e:?}");
}
}
- loop {
- let notification = event_loop.poll().await;
- if let Incoming(Packet::Publish(p)) = notification? {
- for (topic, route) in conf.routes.iter() {
- if !topic_match(topic, &p.topic) {
- continue;
- }
- for program in &route.programs {
- // TODO: Switch to moro_local to avoid this ewwyness
- let program = program.clone();
- let p = p.clone();
- tokio::spawn(async move {
- match timeout(
- program.timeout.unwrap_or(conf.timeout),
- run(&program.command, &p),
- )
- .await
- {
- Err(_) => eprintln!(
- "error: Execution of {program:?} for message {p:?} timed out"
- ),
- Ok(Err(e)) => eprintln!("error: Failed to run {program:?}: {e:?}"),
- _ => (),
- }
- });
+ moro_local::async_scope!(|scope| -> anyhow::Result<()> {
+ loop {
+ let notification = event_loop.poll().await;
+ if let Incoming(Packet::Publish(p)) = notification? {
+ let p = Rc::new(p);
+ for (topic, route) in conf.routes.iter() {
+ if !topic_match(topic, &p.topic) {
+ continue;
+ }
+ for program in route.programs.iter() {
+ let p = p.clone();
+ scope.spawn(async move {
+ match timeout(
+ program.timeout.unwrap_or(conf.timeout),
+ run(&program.command, &p),
+ )
+ .await
+ {
+ Err(_) => eprintln!(
+ "error: Execution of {program:?} for message {p:?} timed out"
+ ),
+ Ok(Err(e)) => eprintln!("error: Failed to run {program:?}: {e:?}"),
+ _ => (),
+ }
+ });
+ }
}
}
}
- }
+ })
+ .await
}
#[cfg(test)]