aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTomasz Kramkowski <tomasz@kramkow.ski>2025-06-27 17:20:46 +0100
committerTomasz Kramkowski <tomasz@kramkow.ski>2025-06-27 17:20:46 +0100
commite7746b6a8a00b7b3daa804e7a2a8f24c278507f4 (patch)
tree1b28b93b5848deaeb900aaecdf9c7c33693c0c55
parent5601c78a9ff23392cfbcc62f231fcfa4e0eeb309 (diff)
downloadmqttr-e7746b6a8a00b7b3daa804e7a2a8f24c278507f4.tar.gz
mqttr-e7746b6a8a00b7b3daa804e7a2a8f24c278507f4.tar.xz
mqttr-e7746b6a8a00b7b3daa804e7a2a8f24c278507f4.zip
Asynchronous process execution
-rw-r--r--Cargo.lock10
-rw-r--r--Cargo.toml2
-rw-r--r--README.md1
-rw-r--r--src/main.rs28
4 files changed, 26 insertions, 15 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 4fe482b..68a88f1 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -440,6 +440,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
+name = "signal-hook-registry"
+version = "1.4.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410"
+dependencies = [
+ "libc",
+]
+
+[[package]]
name = "slab"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -515,6 +524,7 @@ dependencies = [
"libc",
"mio",
"pin-project-lite",
+ "signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.52.0",
diff --git a/Cargo.toml b/Cargo.toml
index 1dfcfab..001d835 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,5 +10,5 @@ edition = "2021"
anyhow = "1.0.98"
rumqttc = "0.24.0"
serde = { version = "1.0.219", features = ["derive"] }
-tokio = { version = "1.45.1", features = ["rt", "macros"] }
+tokio = { version = "1.45.1", features = ["rt", "macros", "process"] }
toml = { version = "0.8.22", default-features = false, features = ["parse"] }
diff --git a/README.md b/README.md
index 535c1e2..a74bc7c 100644
--- a/README.md
+++ b/README.md
@@ -73,7 +73,6 @@ it being ran every time a new MQTT message is published to this topic.
## Missing Features
-* Asynchronous program execution
* Timeouts (existent, configurable, per process configurable)
* Permission checks on `mqttr.toml` if it contains a password (to ensure the
password isn't being exposed)
diff --git a/src/main.rs b/src/main.rs
index 07288e5..c5299a3 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,30 +3,27 @@
// TODO: Log levels
-use std::{
- io::Write,
- path::PathBuf,
- process::{Command, Stdio},
-};
+use std::{path::PathBuf, process::Stdio};
use anyhow::Context;
use rumqttc::{Event::Incoming, Packet, Publish, QoS};
+use tokio::{io::AsyncWriteExt, process::Command};
mod config;
const PROGRAM: &str = "mqttr";
-fn run(program: &[String], message: &Publish) -> anyhow::Result<()> {
- // TODO: Async
+async fn run(program: &[String], message: &Publish) -> anyhow::Result<()> {
// TODO: Set environment variables
let mut proc = Command::new(&program[0])
.args(&program[1..])
.arg(&message.topic)
.stdin(Stdio::piped())
.spawn()?;
- let stdin = proc.stdin.as_mut().context("No stdin")?;
- stdin.write_all(&message.payload)?;
- println!("{}", proc.wait()?);
+ let mut stdin = proc.stdin.take().context("No stdin")?;
+ stdin.write(&message.payload).await?;
+ drop(stdin);
+ println!("{}", proc.wait().await?);
Ok(())
}
@@ -84,9 +81,14 @@ async fn main() -> anyhow::Result<()> {
for (topic, programs) in conf.routes.iter() {
if topic_match(&topic, &p.topic) {
for program in programs {
- if let Err(e) = run(program, &p) {
- eprintln!("error: Failed to run {program:?}: {e:?}");
- }
+ // TODO: Eww
+ let program = program.clone();
+ let p = p.clone();
+ tokio::spawn(async move {
+ if let Err(e) = run(&program, &p).await {
+ eprintln!("error: Failed to run {program:?}: {e:?}");
+ }
+ });
}
}
}