aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorTomasz Kramkowski <tomasz@kramkow.ski>2025-06-27 17:20:46 +0100
committerTomasz Kramkowski <tomasz@kramkow.ski>2025-06-27 17:20:46 +0100
commite7746b6a8a00b7b3daa804e7a2a8f24c278507f4 (patch)
tree1b28b93b5848deaeb900aaecdf9c7c33693c0c55 /src
parent5601c78a9ff23392cfbcc62f231fcfa4e0eeb309 (diff)
downloadmqttr-e7746b6a8a00b7b3daa804e7a2a8f24c278507f4.tar.gz
mqttr-e7746b6a8a00b7b3daa804e7a2a8f24c278507f4.tar.xz
mqttr-e7746b6a8a00b7b3daa804e7a2a8f24c278507f4.zip
Asynchronous process execution
Diffstat (limited to 'src')
-rw-r--r--src/main.rs28
1 files changed, 15 insertions, 13 deletions
diff --git a/src/main.rs b/src/main.rs
index 07288e5..c5299a3 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,30 +3,27 @@
// TODO: Log levels
-use std::{
- io::Write,
- path::PathBuf,
- process::{Command, Stdio},
-};
+use std::{path::PathBuf, process::Stdio};
use anyhow::Context;
use rumqttc::{Event::Incoming, Packet, Publish, QoS};
+use tokio::{io::AsyncWriteExt, process::Command};
mod config;
const PROGRAM: &str = "mqttr";
-fn run(program: &[String], message: &Publish) -> anyhow::Result<()> {
- // TODO: Async
+async fn run(program: &[String], message: &Publish) -> anyhow::Result<()> {
// TODO: Set environment variables
let mut proc = Command::new(&program[0])
.args(&program[1..])
.arg(&message.topic)
.stdin(Stdio::piped())
.spawn()?;
- let stdin = proc.stdin.as_mut().context("No stdin")?;
- stdin.write_all(&message.payload)?;
- println!("{}", proc.wait()?);
+ let mut stdin = proc.stdin.take().context("No stdin")?;
+ stdin.write(&message.payload).await?;
+ drop(stdin);
+ println!("{}", proc.wait().await?);
Ok(())
}
@@ -84,9 +81,14 @@ async fn main() -> anyhow::Result<()> {
for (topic, programs) in conf.routes.iter() {
if topic_match(&topic, &p.topic) {
for program in programs {
- if let Err(e) = run(program, &p) {
- eprintln!("error: Failed to run {program:?}: {e:?}");
- }
+ // TODO: Eww
+ let program = program.clone();
+ let p = p.clone();
+ tokio::spawn(async move {
+ if let Err(e) = run(&program, &p).await {
+ eprintln!("error: Failed to run {program:?}: {e:?}");
+ }
+ });
}
}
}