diff options
| author | Tomasz Kramkowski <tomasz@kramkow.ski> | 2025-07-04 12:50:23 +0100 | 
|---|---|---|
| committer | Tomasz Kramkowski <tomasz@kramkow.ski> | 2025-07-04 12:50:23 +0100 | 
| commit | b24bb23376ac00a59d30638fed6ddec885c4f95f (patch) | |
| tree | d6f28d31f349642554026045316ec818d371389b | |
| parent | 12d85f363249cbda7002af79c88d48ad0a36746b (diff) | |
| download | mqttr-b24bb23376ac00a59d30638fed6ddec885c4f95f.tar.gz mqttr-b24bb23376ac00a59d30638fed6ddec885c4f95f.tar.xz mqttr-b24bb23376ac00a59d30638fed6ddec885c4f95f.zip  | |
Configurable global timeout
| -rw-r--r-- | CHANGELOG.md | 9 | ||||
| -rw-r--r-- | README.md | 3 | ||||
| -rw-r--r-- | src/config.rs | 57 | ||||
| -rw-r--r-- | src/main.rs | 4 | 
4 files changed, 69 insertions, 4 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index f50ad20..61e1e92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@  # Changelog +## [Unreleased] + +### Added + +* Configurable global timeout +  ## [0.2.0]  ### Added @@ -31,5 +37,6 @@    message, receive the message on stdin and the topic as an argument.  * Configure an MQTT server using a host, port and optional credentials. -[0.2.0]: https://the-tk.com/cgit/mqttr/diff/?id=HEAD&id2=v0.1.0 +[Unreleased]: https://the-tk.com/cgit/mqttr/diff/?id=HEAD&id2=v0.2.0 +[0.2.0]: https://the-tk.com/cgit/mqttr/diff/?id=v0.2.0&id2=v0.1.0  [0.1.0]: https://the-tk.com/cgit/mqttr/tag/?h=v0.1.0 @@ -37,6 +37,7 @@ host = "localhost"      # MQTT server host  port = 1883             # MQTT server port  qos = "exactly-once"    # Default subscription QoS                          # at-least-once (=0), at-most-once (=1), exactly-once (=2) +timeout = 10.5          # Timeout in seconds (0 means (effectively) no timeout)  # [credentials]         # Uncomment to specify MQTT connection credentials  # username = "username"  # password = "password" @@ -83,7 +84,7 @@ it being ran every time a new MQTT message is published to this topic.  ## Missing Features -* Configurable timeouts (eventually configurable per process) +* Timeout configurable per-program  * Configurable logging  * Ability to configure programs with non-UTF-8 in paths  * Maybe config reloading on SIGHUP diff --git a/src/config.rs b/src/config.rs index 76f65db..c6306bc 100644 --- a/src/config.rs +++ b/src/config.rs @@ -37,6 +37,10 @@ fn default_id() -> String {      PROGRAM.to_string()  } +fn default_timeout() -> Duration { +    Duration::from_secs(60) +} +  #[allow(clippy::enum_variant_names)]  #[derive(Deserialize, Debug)]  #[serde(remote = "QoS", rename_all = "kebab-case")] @@ -116,6 +120,57 @@ impl<'de> Deserialize<'de> for Route {      }  } +pub fn deserialize_timeout<'de, D>(deserializer: D) -> Result<Duration, D::Error> +where +    D: Deserializer<'de>, +{ +    struct DurationVisitor; + +    impl<'de> de::Visitor<'de> for DurationVisitor { +        type Value = Duration; + +        fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { +            formatter.write_str("a positive number") +        } + +        fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E> +        where +            E: de::Error, +        { +            if v < 0 { +                return Err(de::Error::invalid_value( +                    de::Unexpected::Signed(v), +                    &"a non-negative number", +                )); +            } +            if v == 0 { +                Ok(Duration::MAX) +            } else { +                Ok(Duration::from_secs(v as u64)) +            } +        } + +        fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E> +        where +            E: de::Error, +        { +            if v < 0.0 { +                return Err(de::Error::invalid_value( +                    de::Unexpected::Float(v), +                    &"a non-negative number", +                )); +            } +            if v == 0.0 { +                Ok(Duration::MAX) +            } else { +                Ok(Duration::from_secs_f64(v)) +            } +        } +    } + +    deserializer.deserialize_any(DurationVisitor) +} +  #[derive(Deserialize, Debug)]  pub struct Config {      #[serde(default = "default_host")] @@ -124,6 +179,8 @@ pub struct Config {      pub port: u16,      #[serde(with = "QoSDef", default = "default_qos")]      pub qos: QoS, +    #[serde(default = "default_timeout", deserialize_with = "deserialize_timeout")] +    pub timeout: Duration,      pub credentials: Option<Credentials>,      #[serde(default = "default_id")]      pub id: String, diff --git a/src/main.rs b/src/main.rs index cf0dbb7..9ebd103 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,7 @@  // TODO: Log levels -use std::{path::PathBuf, process::Stdio, time::Duration}; +use std::{path::PathBuf, process::Stdio};  use anyhow::Context;  use rumqttc::{Event::Incoming, Packet, Publish, QoS}; @@ -90,7 +90,7 @@ async fn main() -> anyhow::Result<()> {                      let program = program.clone();                      let p = p.clone();                      tokio::spawn(async move { -                        match timeout(Duration::from_secs(60), run(&program, &p)).await { +                        match timeout(conf.timeout, run(&program, &p)).await {                              Err(_) => eprintln!(                                  "error: Execution of {program:?} for message {p:?} timed out"                              ),  | 
