aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG.md5
-rw-r--r--Cargo.lock7
-rw-r--r--Cargo.toml1
-rw-r--r--README.md6
-rw-r--r--src/config.rs44
-rw-r--r--src/config/helpers.rs44
-rw-r--r--src/main.rs3
7 files changed, 84 insertions, 26 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 59d771c..b7745a6 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -12,6 +12,11 @@ SPDX-License-Identifier: CC-BY-SA-4.0
* BREAKING: The MQTT client ID is now set to the `id` config setting directly
rather than `id` followed by the PID.
+### Added
+
+* Ability to specify program paths or arguments which contain non-UTF8 sequences
+ by using `{ b64 = "BaSe64==" }` in place of a string in the config.
+
## [0.3.0] - 2025-07-10
### Added
diff --git a/Cargo.lock b/Cargo.lock
index b703991..2f8dc9c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -71,6 +71,12 @@ dependencies = [
]
[[package]]
+name = "base64"
+version = "0.22.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
+
+[[package]]
name = "bitflags"
version = "2.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -342,6 +348,7 @@ name = "mqttr"
version = "0.3.0"
dependencies = [
"anyhow",
+ "base64",
"log",
"moro-local",
"rumqttc",
diff --git a/Cargo.toml b/Cargo.toml
index 446a10f..e824f35 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -8,6 +8,7 @@ edition = "2021"
[dependencies]
anyhow = "1.0.98"
+base64 = "0.22.1"
log = { version = "0.4.27", features = ["serde"] }
moro-local = { git = "https://github.com/EliteTK/moro-local.git", branch = "dependency-reduction" }
rumqttc = "0.24.0"
diff --git a/README.md b/README.md
index f18d485..910fe66 100644
--- a/README.md
+++ b/README.md
@@ -58,10 +58,13 @@ The routes follow the following format:
]
"foo/bar/baz" = {
qos = "at-least-once", # Subscription QoS
- programs = [["/foo"], ["/bar"]]
+ programs = [["/foo"], ["/bar"], [{b64 = "L3NpbGx5/3BhdGg="}, "normal-arg"]]
}
```
+Note: If your program path or one of its arguments contains invalid UTF8, you
+can use a `{ b64 = "BaSe64==" }` in place of the argument.
+
On startup, `mqttr` will read the config file and subscribe to all the topics.
When a message is received, `mqttr` will match the message topic against the
routes and execute every program which is part of a matching route. Each program
@@ -88,7 +91,6 @@ it being ran every time a new MQTT message is published to this topic.
## Missing Features
-* Ability to configure programs with non-UTF-8 in paths
* TLS
* mTLS
* Graceful signal handling
diff --git a/src/config.rs b/src/config.rs
index d3fc75b..570994c 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -4,8 +4,8 @@
mod helpers;
use std::{
- collections::HashMap, fs::File, io::Read, os::unix::fs::PermissionsExt, path::Path,
- time::Duration,
+ collections::HashMap, ffi::OsString, fs::File, io::Read, os::unix::fs::PermissionsExt,
+ path::Path, time::Duration,
};
use anyhow::bail;
@@ -66,9 +66,7 @@ fn default_level_filter() -> LevelFilter {
#[derive(Debug, PartialEq, Clone)]
pub struct Program {
- // TODO: Figure out a way to allow arbitrary unix paths (arbitrary
- // non-unicode) without base64
- pub command: Box<[String]>,
+ pub command: Box<[OsString]>,
pub timeout: Option<Duration>,
}
@@ -128,19 +126,25 @@ pub fn load<P: AsRef<Path>>(path: P) -> anyhow::Result<Config> {
mod tests {
use super::*;
use rumqttc::QoS;
- use std::time::Duration;
+ use std::{ffi::OsStr, os::unix::ffi::OsStrExt, time::Duration};
impl Program {
- fn new(command: Vec<&str>) -> Self {
+ fn new(command: Vec<&[u8]>) -> Self {
Program {
- command: command.into_iter().map(str::to_string).collect(),
+ command: command
+ .into_iter()
+ .map(|s| OsStr::from_bytes(s).to_owned())
+ .collect(),
timeout: None,
}
}
- fn new_with_timeout(command: Vec<&str>, timeout: Duration) -> Self {
+ fn new_with_timeout(command: Vec<&[u8]>, timeout: Duration) -> Self {
Program {
- command: command.into_iter().map(str::to_string).collect(),
+ command: command
+ .into_iter()
+ .map(|s| OsStr::from_bytes(s).to_owned())
+ .collect(),
timeout: Some(timeout),
}
}
@@ -170,7 +174,7 @@ mod tests {
{ command = ["/bin/program3", "arg"]},
], qos = "exactly-once" }
"topic/seq" = [
- ["/bin/program4", "arg"],
+ ["/bin/program4", "arg", { b64 = "//9hYmP//w==" }],
{ command = ["/bin/program5"], timeout = 1.2 },
]
"#;
@@ -196,9 +200,9 @@ mod tests {
assert_eq!(
route_map.programs,
vec![
- Program::new(vec!["/bin/program1"]),
- Program::new(vec!["/bin/program2", "arg"]),
- Program::new(vec!["/bin/program3", "arg"]),
+ Program::new(vec![b"/bin/program1"]),
+ Program::new(vec![b"/bin/program2", b"arg"]),
+ Program::new(vec![b"/bin/program3", b"arg"]),
]
.into()
);
@@ -208,8 +212,8 @@ mod tests {
assert_eq!(
route_seq.programs,
vec![
- Program::new(vec!["/bin/program4", "arg"]),
- Program::new_with_timeout(vec!["/bin/program5"], Duration::from_secs_f64(1.2)),
+ Program::new(vec![b"/bin/program4", b"arg", b"\xff\xffabc\xff\xff"]),
+ Program::new_with_timeout(vec![b"/bin/program5"], Duration::from_secs_f64(1.2)),
]
.into()
);
@@ -243,8 +247,8 @@ mod tests {
assert_eq!(
route.programs,
vec![
- Program::new(vec!["/foo/bar"]),
- Program::new(vec!["/baz/qux", "arg"])
+ Program::new(vec![b"/foo/bar"]),
+ Program::new(vec![b"/baz/qux", b"arg"])
]
.into()
);
@@ -264,14 +268,14 @@ mod tests {
let route_with_qos = config.routes.get("topic/with_qos").unwrap();
assert_eq!(
route_with_qos.programs,
- vec![Program::new(vec!["/foo/bar", "arg"])].into()
+ vec![Program::new(vec![b"/foo/bar", b"arg"])].into()
);
assert_eq!(route_with_qos.qos, Some(QoS::AtLeastOnce));
let route_without_qos = config.routes.get("topic/without_qos").unwrap();
assert_eq!(
route_without_qos.programs,
- vec![Program::new(vec!["/baz/qux"])].into()
+ vec![Program::new(vec![b"/baz/qux"])].into()
);
assert_eq!(route_without_qos.qos, None);
}
diff --git a/src/config/helpers.rs b/src/config/helpers.rs
index ddf29ad..edf4bd5 100644
--- a/src/config/helpers.rs
+++ b/src/config/helpers.rs
@@ -1,8 +1,13 @@
// SPDX-FileCopyrightText: 2025 Tomasz Kramkowski <tomasz@kramkow.ski>
// SPDX-License-Identifier: GPL-3.0-or-later
-use std::time::Duration;
+use std::{
+ ffi::{OsStr, OsString},
+ os::unix::ffi::OsStrExt,
+ time::Duration,
+};
+use base64::{engine::general_purpose::STANDARD, Engine as _};
use rumqttc::QoS;
use serde::{de, Deserialize, Deserializer};
@@ -79,12 +84,44 @@ where
Ok(helper.map(|Helper(external)| external))
}
+pub fn deserialize_box_slice_os_string<'de, D>(deserializer: D) -> Result<Box<[OsString]>, D::Error>
+where
+ D: Deserializer<'de>,
+{
+ #[derive(Deserialize)]
+ #[serde(untagged)]
+ enum Untagged {
+ String(String),
+ Base64 { b64: String },
+ }
+
+ impl TryInto<OsString> for Untagged {
+ type Error = String;
+ fn try_into(self) -> Result<OsString, Self::Error> {
+ match self {
+ Untagged::String(s) => Ok(s.into()),
+ Untagged::Base64 { b64 } => match STANDARD.decode(&b64) {
+ Err(_) => Err(b64),
+ Ok(b) => Ok(OsStr::from_bytes(&b).to_owned()),
+ },
+ }
+ }
+ }
+
+ Vec::<Untagged>::deserialize(deserializer)?
+ .into_iter()
+ .map(TryInto::<OsString>::try_into)
+ .collect::<Result<_, _>>()
+ .map_err(|e| de::Error::invalid_value(de::Unexpected::Str(&e), &"valid Base64"))
+}
+
impl<'de> Deserialize<'de> for super::Program {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
#[derive(Deserialize)]
#[serde(remote = "super::Program")]
struct Helper {
- command: Box<[String]>,
+ #[serde(deserialize_with = "deserialize_box_slice_os_string")]
+ command: Box<[OsString]>,
#[serde(default, deserialize_with = "deserialize_timeout_opt")]
timeout: Option<Duration>,
}
@@ -92,7 +129,8 @@ impl<'de> Deserialize<'de> for super::Program {
#[derive(Deserialize)]
#[serde(untagged)]
enum Untagged {
- Short(Box<[String]>),
+ #[serde(deserialize_with = "deserialize_box_slice_os_string")]
+ Short(Box<[OsString]>),
#[serde(with = "Helper")]
Full(super::Program),
}
diff --git a/src/main.rs b/src/main.rs
index c2a3797..2d20c94 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -2,6 +2,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later
use std::{
+ ffi::OsString,
os::unix::process::ExitStatusExt,
path::PathBuf,
process::{ExitStatus, Stdio},
@@ -18,7 +19,7 @@ mod mqtt;
const PROGRAM: &str = "mqttr";
-async fn run(program: &[String], message: &Publish) -> anyhow::Result<ExitStatus> {
+async fn run(program: &[OsString], message: &Publish) -> anyhow::Result<ExitStatus> {
debug!("Starting program {program:?} for message {message:?}");
let mut command = Command::new(&program[0]);
command