diff options
-rw-r--r-- | CHANGELOG.md | 5 | ||||
-rw-r--r-- | Cargo.lock | 7 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | README.md | 6 | ||||
-rw-r--r-- | src/config.rs | 44 | ||||
-rw-r--r-- | src/config/helpers.rs | 44 | ||||
-rw-r--r-- | src/main.rs | 3 |
7 files changed, 84 insertions, 26 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 59d771c..b7745a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,11 @@ SPDX-License-Identifier: CC-BY-SA-4.0 * BREAKING: The MQTT client ID is now set to the `id` config setting directly rather than `id` followed by the PID. +### Added + +* Ability to specify program paths or arguments which contain non-UTF8 sequences + by using `{ b64 = "BaSe64==" }` in place of a string in the config. + ## [0.3.0] - 2025-07-10 ### Added @@ -71,6 +71,12 @@ dependencies = [ ] [[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + +[[package]] name = "bitflags" version = "2.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -342,6 +348,7 @@ name = "mqttr" version = "0.3.0" dependencies = [ "anyhow", + "base64", "log", "moro-local", "rumqttc", @@ -8,6 +8,7 @@ edition = "2021" [dependencies] anyhow = "1.0.98" +base64 = "0.22.1" log = { version = "0.4.27", features = ["serde"] } moro-local = { git = "https://github.com/EliteTK/moro-local.git", branch = "dependency-reduction" } rumqttc = "0.24.0" @@ -58,10 +58,13 @@ The routes follow the following format: ] "foo/bar/baz" = { qos = "at-least-once", # Subscription QoS - programs = [["/foo"], ["/bar"]] + programs = [["/foo"], ["/bar"], [{b64 = "L3NpbGx5/3BhdGg="}, "normal-arg"]] } ``` +Note: If your program path or one of its arguments contains invalid UTF8, you +can use a `{ b64 = "BaSe64==" }` in place of the argument. + On startup, `mqttr` will read the config file and subscribe to all the topics. When a message is received, `mqttr` will match the message topic against the routes and execute every program which is part of a matching route. Each program @@ -88,7 +91,6 @@ it being ran every time a new MQTT message is published to this topic. ## Missing Features -* Ability to configure programs with non-UTF-8 in paths * TLS * mTLS * Graceful signal handling diff --git a/src/config.rs b/src/config.rs index d3fc75b..570994c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -4,8 +4,8 @@ mod helpers; use std::{ - collections::HashMap, fs::File, io::Read, os::unix::fs::PermissionsExt, path::Path, - time::Duration, + collections::HashMap, ffi::OsString, fs::File, io::Read, os::unix::fs::PermissionsExt, + path::Path, time::Duration, }; use anyhow::bail; @@ -66,9 +66,7 @@ fn default_level_filter() -> LevelFilter { #[derive(Debug, PartialEq, Clone)] pub struct Program { - // TODO: Figure out a way to allow arbitrary unix paths (arbitrary - // non-unicode) without base64 - pub command: Box<[String]>, + pub command: Box<[OsString]>, pub timeout: Option<Duration>, } @@ -128,19 +126,25 @@ pub fn load<P: AsRef<Path>>(path: P) -> anyhow::Result<Config> { mod tests { use super::*; use rumqttc::QoS; - use std::time::Duration; + use std::{ffi::OsStr, os::unix::ffi::OsStrExt, time::Duration}; impl Program { - fn new(command: Vec<&str>) -> Self { + fn new(command: Vec<&[u8]>) -> Self { Program { - command: command.into_iter().map(str::to_string).collect(), + command: command + .into_iter() + .map(|s| OsStr::from_bytes(s).to_owned()) + .collect(), timeout: None, } } - fn new_with_timeout(command: Vec<&str>, timeout: Duration) -> Self { + fn new_with_timeout(command: Vec<&[u8]>, timeout: Duration) -> Self { Program { - command: command.into_iter().map(str::to_string).collect(), + command: command + .into_iter() + .map(|s| OsStr::from_bytes(s).to_owned()) + .collect(), timeout: Some(timeout), } } @@ -170,7 +174,7 @@ mod tests { { command = ["/bin/program3", "arg"]}, ], qos = "exactly-once" } "topic/seq" = [ - ["/bin/program4", "arg"], + ["/bin/program4", "arg", { b64 = "//9hYmP//w==" }], { command = ["/bin/program5"], timeout = 1.2 }, ] "#; @@ -196,9 +200,9 @@ mod tests { assert_eq!( route_map.programs, vec![ - Program::new(vec!["/bin/program1"]), - Program::new(vec!["/bin/program2", "arg"]), - Program::new(vec!["/bin/program3", "arg"]), + Program::new(vec![b"/bin/program1"]), + Program::new(vec![b"/bin/program2", b"arg"]), + Program::new(vec![b"/bin/program3", b"arg"]), ] .into() ); @@ -208,8 +212,8 @@ mod tests { assert_eq!( route_seq.programs, vec![ - Program::new(vec!["/bin/program4", "arg"]), - Program::new_with_timeout(vec!["/bin/program5"], Duration::from_secs_f64(1.2)), + Program::new(vec![b"/bin/program4", b"arg", b"\xff\xffabc\xff\xff"]), + Program::new_with_timeout(vec![b"/bin/program5"], Duration::from_secs_f64(1.2)), ] .into() ); @@ -243,8 +247,8 @@ mod tests { assert_eq!( route.programs, vec![ - Program::new(vec!["/foo/bar"]), - Program::new(vec!["/baz/qux", "arg"]) + Program::new(vec![b"/foo/bar"]), + Program::new(vec![b"/baz/qux", b"arg"]) ] .into() ); @@ -264,14 +268,14 @@ mod tests { let route_with_qos = config.routes.get("topic/with_qos").unwrap(); assert_eq!( route_with_qos.programs, - vec![Program::new(vec!["/foo/bar", "arg"])].into() + vec![Program::new(vec![b"/foo/bar", b"arg"])].into() ); assert_eq!(route_with_qos.qos, Some(QoS::AtLeastOnce)); let route_without_qos = config.routes.get("topic/without_qos").unwrap(); assert_eq!( route_without_qos.programs, - vec![Program::new(vec!["/baz/qux"])].into() + vec![Program::new(vec![b"/baz/qux"])].into() ); assert_eq!(route_without_qos.qos, None); } diff --git a/src/config/helpers.rs b/src/config/helpers.rs index ddf29ad..edf4bd5 100644 --- a/src/config/helpers.rs +++ b/src/config/helpers.rs @@ -1,8 +1,13 @@ // SPDX-FileCopyrightText: 2025 Tomasz Kramkowski <tomasz@kramkow.ski> // SPDX-License-Identifier: GPL-3.0-or-later -use std::time::Duration; +use std::{ + ffi::{OsStr, OsString}, + os::unix::ffi::OsStrExt, + time::Duration, +}; +use base64::{engine::general_purpose::STANDARD, Engine as _}; use rumqttc::QoS; use serde::{de, Deserialize, Deserializer}; @@ -79,12 +84,44 @@ where Ok(helper.map(|Helper(external)| external)) } +pub fn deserialize_box_slice_os_string<'de, D>(deserializer: D) -> Result<Box<[OsString]>, D::Error> +where + D: Deserializer<'de>, +{ + #[derive(Deserialize)] + #[serde(untagged)] + enum Untagged { + String(String), + Base64 { b64: String }, + } + + impl TryInto<OsString> for Untagged { + type Error = String; + fn try_into(self) -> Result<OsString, Self::Error> { + match self { + Untagged::String(s) => Ok(s.into()), + Untagged::Base64 { b64 } => match STANDARD.decode(&b64) { + Err(_) => Err(b64), + Ok(b) => Ok(OsStr::from_bytes(&b).to_owned()), + }, + } + } + } + + Vec::<Untagged>::deserialize(deserializer)? + .into_iter() + .map(TryInto::<OsString>::try_into) + .collect::<Result<_, _>>() + .map_err(|e| de::Error::invalid_value(de::Unexpected::Str(&e), &"valid Base64")) +} + impl<'de> Deserialize<'de> for super::Program { fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> { #[derive(Deserialize)] #[serde(remote = "super::Program")] struct Helper { - command: Box<[String]>, + #[serde(deserialize_with = "deserialize_box_slice_os_string")] + command: Box<[OsString]>, #[serde(default, deserialize_with = "deserialize_timeout_opt")] timeout: Option<Duration>, } @@ -92,7 +129,8 @@ impl<'de> Deserialize<'de> for super::Program { #[derive(Deserialize)] #[serde(untagged)] enum Untagged { - Short(Box<[String]>), + #[serde(deserialize_with = "deserialize_box_slice_os_string")] + Short(Box<[OsString]>), #[serde(with = "Helper")] Full(super::Program), } diff --git a/src/main.rs b/src/main.rs index c2a3797..2d20c94 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: GPL-3.0-or-later use std::{ + ffi::OsString, os::unix::process::ExitStatusExt, path::PathBuf, process::{ExitStatus, Stdio}, @@ -18,7 +19,7 @@ mod mqtt; const PROGRAM: &str = "mqttr"; -async fn run(program: &[String], message: &Publish) -> anyhow::Result<ExitStatus> { +async fn run(program: &[OsString], message: &Publish) -> anyhow::Result<ExitStatus> { debug!("Starting program {program:?} for message {message:?}"); let mut command = Command::new(&program[0]); command |