From b24bb23376ac00a59d30638fed6ddec885c4f95f Mon Sep 17 00:00:00 2001 From: Tomasz Kramkowski Date: Fri, 4 Jul 2025 12:50:23 +0100 Subject: Configurable global timeout --- CHANGELOG.md | 9 ++++++++- README.md | 3 ++- src/config.rs | 57 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 4 ++-- 4 files changed, 69 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f50ad20..61e1e92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## [Unreleased] + +### Added + +* Configurable global timeout + ## [0.2.0] ### Added @@ -31,5 +37,6 @@ message, receive the message on stdin and the topic as an argument. * Configure an MQTT server using a host, port and optional credentials. -[0.2.0]: https://the-tk.com/cgit/mqttr/diff/?id=HEAD&id2=v0.1.0 +[Unreleased]: https://the-tk.com/cgit/mqttr/diff/?id=HEAD&id2=v0.2.0 +[0.2.0]: https://the-tk.com/cgit/mqttr/diff/?id=v0.2.0&id2=v0.1.0 [0.1.0]: https://the-tk.com/cgit/mqttr/tag/?h=v0.1.0 diff --git a/README.md b/README.md index fb03266..d2af21b 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,7 @@ host = "localhost" # MQTT server host port = 1883 # MQTT server port qos = "exactly-once" # Default subscription QoS # at-least-once (=0), at-most-once (=1), exactly-once (=2) +timeout = 10.5 # Timeout in seconds (0 means (effectively) no timeout) # [credentials] # Uncomment to specify MQTT connection credentials # username = "username" # password = "password" @@ -83,7 +84,7 @@ it being ran every time a new MQTT message is published to this topic. ## Missing Features -* Configurable timeouts (eventually configurable per process) +* Timeout configurable per-program * Configurable logging * Ability to configure programs with non-UTF-8 in paths * Maybe config reloading on SIGHUP diff --git a/src/config.rs b/src/config.rs index 76f65db..c6306bc 100644 --- a/src/config.rs +++ b/src/config.rs @@ -37,6 +37,10 @@ fn default_id() -> String { PROGRAM.to_string() } +fn default_timeout() -> Duration { + Duration::from_secs(60) +} + #[allow(clippy::enum_variant_names)] #[derive(Deserialize, Debug)] #[serde(remote = "QoS", rename_all = "kebab-case")] @@ -116,6 +120,57 @@ impl<'de> Deserialize<'de> for Route { } } +pub fn deserialize_timeout<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + struct DurationVisitor; + + impl<'de> de::Visitor<'de> for DurationVisitor { + type Value = Duration; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a positive number") + } + + fn visit_i64(self, v: i64) -> Result + where + E: de::Error, + { + if v < 0 { + return Err(de::Error::invalid_value( + de::Unexpected::Signed(v), + &"a non-negative number", + )); + } + if v == 0 { + Ok(Duration::MAX) + } else { + Ok(Duration::from_secs(v as u64)) + } + } + + fn visit_f64(self, v: f64) -> Result + where + E: de::Error, + { + if v < 0.0 { + return Err(de::Error::invalid_value( + de::Unexpected::Float(v), + &"a non-negative number", + )); + } + if v == 0.0 { + Ok(Duration::MAX) + } else { + Ok(Duration::from_secs_f64(v)) + } + } + } + + deserializer.deserialize_any(DurationVisitor) +} + #[derive(Deserialize, Debug)] pub struct Config { #[serde(default = "default_host")] @@ -124,6 +179,8 @@ pub struct Config { pub port: u16, #[serde(with = "QoSDef", default = "default_qos")] pub qos: QoS, + #[serde(default = "default_timeout", deserialize_with = "deserialize_timeout")] + pub timeout: Duration, pub credentials: Option, #[serde(default = "default_id")] pub id: String, diff --git a/src/main.rs b/src/main.rs index cf0dbb7..9ebd103 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,7 @@ // TODO: Log levels -use std::{path::PathBuf, process::Stdio, time::Duration}; +use std::{path::PathBuf, process::Stdio}; use anyhow::Context; use rumqttc::{Event::Incoming, Packet, Publish, QoS}; @@ -90,7 +90,7 @@ async fn main() -> anyhow::Result<()> { let program = program.clone(); let p = p.clone(); tokio::spawn(async move { - match timeout(Duration::from_secs(60), run(&program, &p)).await { + match timeout(conf.timeout, run(&program, &p)).await { Err(_) => eprintln!( "error: Execution of {program:?} for message {p:?} timed out" ), -- cgit v1.2.3-70-g09d2