From e7746b6a8a00b7b3daa804e7a2a8f24c278507f4 Mon Sep 17 00:00:00 2001 From: Tomasz Kramkowski Date: Fri, 27 Jun 2025 17:20:46 +0100 Subject: Asynchronous process execution --- src/main.rs | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) (limited to 'src') diff --git a/src/main.rs b/src/main.rs index 07288e5..c5299a3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,30 +3,27 @@ // TODO: Log levels -use std::{ - io::Write, - path::PathBuf, - process::{Command, Stdio}, -}; +use std::{path::PathBuf, process::Stdio}; use anyhow::Context; use rumqttc::{Event::Incoming, Packet, Publish, QoS}; +use tokio::{io::AsyncWriteExt, process::Command}; mod config; const PROGRAM: &str = "mqttr"; -fn run(program: &[String], message: &Publish) -> anyhow::Result<()> { - // TODO: Async +async fn run(program: &[String], message: &Publish) -> anyhow::Result<()> { // TODO: Set environment variables let mut proc = Command::new(&program[0]) .args(&program[1..]) .arg(&message.topic) .stdin(Stdio::piped()) .spawn()?; - let stdin = proc.stdin.as_mut().context("No stdin")?; - stdin.write_all(&message.payload)?; - println!("{}", proc.wait()?); + let mut stdin = proc.stdin.take().context("No stdin")?; + stdin.write(&message.payload).await?; + drop(stdin); + println!("{}", proc.wait().await?); Ok(()) } @@ -84,9 +81,14 @@ async fn main() -> anyhow::Result<()> { for (topic, programs) in conf.routes.iter() { if topic_match(&topic, &p.topic) { for program in programs { - if let Err(e) = run(program, &p) { - eprintln!("error: Failed to run {program:?}: {e:?}"); - } + // TODO: Eww + let program = program.clone(); + let p = p.clone(); + tokio::spawn(async move { + if let Err(e) = run(&program, &p).await { + eprintln!("error: Failed to run {program:?}: {e:?}"); + } + }); } } } -- cgit v1.2.3-70-g09d2