diff options
author | Tomasz Kramkowski <tomasz@kramkow.ski> | 2025-06-27 17:01:59 +0100 |
---|---|---|
committer | Tomasz Kramkowski <tomasz@kramkow.ski> | 2025-06-27 17:01:59 +0100 |
commit | 5601c78a9ff23392cfbcc62f231fcfa4e0eeb309 (patch) | |
tree | 90bf213c5d5da13d02065182e0dbcf3a18e3cd05 | |
parent | a38fc67f804861d1fae5211061528ae80722cbc9 (diff) | |
download | mqttr-5601c78a9ff23392cfbcc62f231fcfa4e0eeb309.tar.gz mqttr-5601c78a9ff23392cfbcc62f231fcfa4e0eeb309.tar.xz mqttr-5601c78a9ff23392cfbcc62f231fcfa4e0eeb309.zip |
Minimal changes to convert to async
-rw-r--r-- | Cargo.lock | 5 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/config.rs | 6 | ||||
-rw-r--r-- | src/main.rs | 11 |
4 files changed, 13 insertions, 10 deletions
@@ -223,6 +223,7 @@ dependencies = [ "anyhow", "rumqttc", "serde", + "tokio", "toml", ] @@ -505,9 +506,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.45.0" +version = "1.45.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2513ca694ef9ede0fb23fe71a4ee4107cb102b9dc1930f6d0fd77aae068ae165" +checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779" dependencies = [ "backtrace", "bytes", @@ -10,4 +10,5 @@ edition = "2021" anyhow = "1.0.98" rumqttc = "0.24.0" serde = { version = "1.0.219", features = ["derive"] } +tokio = { version = "1.45.1", features = ["rt", "macros"] } toml = { version = "0.8.22", default-features = false, features = ["parse"] } diff --git a/src/config.rs b/src/config.rs index 1190b9a..0f9cff3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, fs, path::Path, process, time::Duration}; -use rumqttc::{Client, Connection, MqttOptions}; +use rumqttc::{AsyncClient, EventLoop, MqttOptions}; use serde::Deserialize; use crate::PROGRAM; @@ -41,7 +41,7 @@ pub struct Config { } impl Config { - pub fn mqtt_client(&self) -> (Client, Connection) { + pub fn mqtt_client(&self) -> (AsyncClient, EventLoop) { let client_id = format!("{}_{}", self.id, process::id()); let mut options = MqttOptions::new(client_id, &self.host, self.port); if let Some(credentials) = &self.credentials { @@ -49,7 +49,7 @@ impl Config { } options.set_keep_alive(Duration::from_secs(5)); options.set_max_packet_size(10 * 1024 * 1024, 10 * 1024 * 1024); - Client::new(options, 10) + AsyncClient::new(options, 10) } } diff --git a/src/main.rs b/src/main.rs index 14ccb7a..07288e5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -64,19 +64,21 @@ fn topic_match(filter: &str, topic: &str) -> bool { } } -fn main() -> anyhow::Result<()> { +#[tokio::main(flavor = "current_thread")] +async fn main() -> anyhow::Result<()> { let mut conf_path: PathBuf = option_env!("SYSCONFDIR").unwrap_or("/usr/local/etc").into(); conf_path.push(format!("{PROGRAM}.toml")); let conf = config::load(&conf_path) .with_context(|| format!("Failed to load config: {:?}", &conf_path))?; - let (client, mut connection) = conf.mqtt_client(); + let (client, mut event_loop) = conf.mqtt_client(); for topic in conf.routes.keys() { // TODO: Configurable subscription QoS - if let Err(e) = client.subscribe(topic, QoS::AtMostOnce) { + if let Err(e) = client.subscribe(topic, QoS::AtMostOnce).await { eprintln!("warning: Failed to subscribe to '{topic}': {e:?}"); } } - for notification in connection.iter() { + loop { + let notification = event_loop.poll().await; match notification? { Incoming(Packet::Publish(p)) => { for (topic, programs) in conf.routes.iter() { @@ -92,7 +94,6 @@ fn main() -> anyhow::Result<()> { _ => (), } } - Ok(()) } #[cfg(test)] |