diff options
-rw-r--r-- | Cargo.lock | 10 | ||||
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | README.md | 1 | ||||
-rw-r--r-- | src/main.rs | 28 |
4 files changed, 26 insertions, 15 deletions
@@ -440,6 +440,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" [[package]] +name = "signal-hook-registry" +version = "1.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" +dependencies = [ + "libc", +] + +[[package]] name = "slab" version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -515,6 +524,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.52.0", @@ -10,5 +10,5 @@ edition = "2021" anyhow = "1.0.98" rumqttc = "0.24.0" serde = { version = "1.0.219", features = ["derive"] } -tokio = { version = "1.45.1", features = ["rt", "macros"] } +tokio = { version = "1.45.1", features = ["rt", "macros", "process"] } toml = { version = "0.8.22", default-features = false, features = ["parse"] } @@ -73,7 +73,6 @@ it being ran every time a new MQTT message is published to this topic. ## Missing Features -* Asynchronous program execution * Timeouts (existent, configurable, per process configurable) * Permission checks on `mqttr.toml` if it contains a password (to ensure the password isn't being exposed) diff --git a/src/main.rs b/src/main.rs index 07288e5..c5299a3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,30 +3,27 @@ // TODO: Log levels -use std::{ - io::Write, - path::PathBuf, - process::{Command, Stdio}, -}; +use std::{path::PathBuf, process::Stdio}; use anyhow::Context; use rumqttc::{Event::Incoming, Packet, Publish, QoS}; +use tokio::{io::AsyncWriteExt, process::Command}; mod config; const PROGRAM: &str = "mqttr"; -fn run(program: &[String], message: &Publish) -> anyhow::Result<()> { - // TODO: Async +async fn run(program: &[String], message: &Publish) -> anyhow::Result<()> { // TODO: Set environment variables let mut proc = Command::new(&program[0]) .args(&program[1..]) .arg(&message.topic) .stdin(Stdio::piped()) .spawn()?; - let stdin = proc.stdin.as_mut().context("No stdin")?; - stdin.write_all(&message.payload)?; - println!("{}", proc.wait()?); + let mut stdin = proc.stdin.take().context("No stdin")?; + stdin.write(&message.payload).await?; + drop(stdin); + println!("{}", proc.wait().await?); Ok(()) } @@ -84,9 +81,14 @@ async fn main() -> anyhow::Result<()> { for (topic, programs) in conf.routes.iter() { if topic_match(&topic, &p.topic) { for program in programs { - if let Err(e) = run(program, &p) { - eprintln!("error: Failed to run {program:?}: {e:?}"); - } + // TODO: Eww + let program = program.clone(); + let p = p.clone(); + tokio::spawn(async move { + if let Err(e) = run(&program, &p).await { + eprintln!("error: Failed to run {program:?}: {e:?}"); + } + }); } } } |