aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG.md9
-rw-r--r--README.md3
-rw-r--r--src/config.rs57
-rw-r--r--src/main.rs4
4 files changed, 69 insertions, 4 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f50ad20..61e1e92 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,11 @@
# Changelog
+## [Unreleased]
+
+### Added
+
+* Configurable global timeout
+
## [0.2.0]
### Added
@@ -31,5 +37,6 @@
message, receive the message on stdin and the topic as an argument.
* Configure an MQTT server using a host, port and optional credentials.
-[0.2.0]: https://the-tk.com/cgit/mqttr/diff/?id=HEAD&id2=v0.1.0
+[Unreleased]: https://the-tk.com/cgit/mqttr/diff/?id=HEAD&id2=v0.2.0
+[0.2.0]: https://the-tk.com/cgit/mqttr/diff/?id=v0.2.0&id2=v0.1.0
[0.1.0]: https://the-tk.com/cgit/mqttr/tag/?h=v0.1.0
diff --git a/README.md b/README.md
index fb03266..d2af21b 100644
--- a/README.md
+++ b/README.md
@@ -37,6 +37,7 @@ host = "localhost" # MQTT server host
port = 1883 # MQTT server port
qos = "exactly-once" # Default subscription QoS
# at-least-once (=0), at-most-once (=1), exactly-once (=2)
+timeout = 10.5 # Timeout in seconds (0 means (effectively) no timeout)
# [credentials] # Uncomment to specify MQTT connection credentials
# username = "username"
# password = "password"
@@ -83,7 +84,7 @@ it being ran every time a new MQTT message is published to this topic.
## Missing Features
-* Configurable timeouts (eventually configurable per process)
+* Timeout configurable per-program
* Configurable logging
* Ability to configure programs with non-UTF-8 in paths
* Maybe config reloading on SIGHUP
diff --git a/src/config.rs b/src/config.rs
index 76f65db..c6306bc 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -37,6 +37,10 @@ fn default_id() -> String {
PROGRAM.to_string()
}
+fn default_timeout() -> Duration {
+ Duration::from_secs(60)
+}
+
#[allow(clippy::enum_variant_names)]
#[derive(Deserialize, Debug)]
#[serde(remote = "QoS", rename_all = "kebab-case")]
@@ -116,6 +120,57 @@ impl<'de> Deserialize<'de> for Route {
}
}
+pub fn deserialize_timeout<'de, D>(deserializer: D) -> Result<Duration, D::Error>
+where
+ D: Deserializer<'de>,
+{
+ struct DurationVisitor;
+
+ impl<'de> de::Visitor<'de> for DurationVisitor {
+ type Value = Duration;
+
+ fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
+ formatter.write_str("a positive number")
+ }
+
+ fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
+ where
+ E: de::Error,
+ {
+ if v < 0 {
+ return Err(de::Error::invalid_value(
+ de::Unexpected::Signed(v),
+ &"a non-negative number",
+ ));
+ }
+ if v == 0 {
+ Ok(Duration::MAX)
+ } else {
+ Ok(Duration::from_secs(v as u64))
+ }
+ }
+
+ fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E>
+ where
+ E: de::Error,
+ {
+ if v < 0.0 {
+ return Err(de::Error::invalid_value(
+ de::Unexpected::Float(v),
+ &"a non-negative number",
+ ));
+ }
+ if v == 0.0 {
+ Ok(Duration::MAX)
+ } else {
+ Ok(Duration::from_secs_f64(v))
+ }
+ }
+ }
+
+ deserializer.deserialize_any(DurationVisitor)
+}
+
#[derive(Deserialize, Debug)]
pub struct Config {
#[serde(default = "default_host")]
@@ -124,6 +179,8 @@ pub struct Config {
pub port: u16,
#[serde(with = "QoSDef", default = "default_qos")]
pub qos: QoS,
+ #[serde(default = "default_timeout", deserialize_with = "deserialize_timeout")]
+ pub timeout: Duration,
pub credentials: Option<Credentials>,
#[serde(default = "default_id")]
pub id: String,
diff --git a/src/main.rs b/src/main.rs
index cf0dbb7..9ebd103 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,7 +3,7 @@
// TODO: Log levels
-use std::{path::PathBuf, process::Stdio, time::Duration};
+use std::{path::PathBuf, process::Stdio};
use anyhow::Context;
use rumqttc::{Event::Incoming, Packet, Publish, QoS};
@@ -90,7 +90,7 @@ async fn main() -> anyhow::Result<()> {
let program = program.clone();
let p = p.clone();
tokio::spawn(async move {
- match timeout(Duration::from_secs(60), run(&program, &p)).await {
+ match timeout(conf.timeout, run(&program, &p)).await {
Err(_) => eprintln!(
"error: Execution of {program:?} for message {p:?} timed out"
),