aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorTomasz Kramkowski <tomasz@kramkow.ski>2025-06-27 17:01:59 +0100
committerTomasz Kramkowski <tomasz@kramkow.ski>2025-06-27 17:01:59 +0100
commit5601c78a9ff23392cfbcc62f231fcfa4e0eeb309 (patch)
tree90bf213c5d5da13d02065182e0dbcf3a18e3cd05 /src
parenta38fc67f804861d1fae5211061528ae80722cbc9 (diff)
downloadmqttr-5601c78a9ff23392cfbcc62f231fcfa4e0eeb309.tar.gz
mqttr-5601c78a9ff23392cfbcc62f231fcfa4e0eeb309.tar.xz
mqttr-5601c78a9ff23392cfbcc62f231fcfa4e0eeb309.zip
Minimal changes to convert to async
Diffstat (limited to 'src')
-rw-r--r--src/config.rs6
-rw-r--r--src/main.rs11
2 files changed, 9 insertions, 8 deletions
diff --git a/src/config.rs b/src/config.rs
index 1190b9a..0f9cff3 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -3,7 +3,7 @@
use std::{collections::HashMap, fs, path::Path, process, time::Duration};
-use rumqttc::{Client, Connection, MqttOptions};
+use rumqttc::{AsyncClient, EventLoop, MqttOptions};
use serde::Deserialize;
use crate::PROGRAM;
@@ -41,7 +41,7 @@ pub struct Config {
}
impl Config {
- pub fn mqtt_client(&self) -> (Client, Connection) {
+ pub fn mqtt_client(&self) -> (AsyncClient, EventLoop) {
let client_id = format!("{}_{}", self.id, process::id());
let mut options = MqttOptions::new(client_id, &self.host, self.port);
if let Some(credentials) = &self.credentials {
@@ -49,7 +49,7 @@ impl Config {
}
options.set_keep_alive(Duration::from_secs(5));
options.set_max_packet_size(10 * 1024 * 1024, 10 * 1024 * 1024);
- Client::new(options, 10)
+ AsyncClient::new(options, 10)
}
}
diff --git a/src/main.rs b/src/main.rs
index 14ccb7a..07288e5 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -64,19 +64,21 @@ fn topic_match(filter: &str, topic: &str) -> bool {
}
}
-fn main() -> anyhow::Result<()> {
+#[tokio::main(flavor = "current_thread")]
+async fn main() -> anyhow::Result<()> {
let mut conf_path: PathBuf = option_env!("SYSCONFDIR").unwrap_or("/usr/local/etc").into();
conf_path.push(format!("{PROGRAM}.toml"));
let conf = config::load(&conf_path)
.with_context(|| format!("Failed to load config: {:?}", &conf_path))?;
- let (client, mut connection) = conf.mqtt_client();
+ let (client, mut event_loop) = conf.mqtt_client();
for topic in conf.routes.keys() {
// TODO: Configurable subscription QoS
- if let Err(e) = client.subscribe(topic, QoS::AtMostOnce) {
+ if let Err(e) = client.subscribe(topic, QoS::AtMostOnce).await {
eprintln!("warning: Failed to subscribe to '{topic}': {e:?}");
}
}
- for notification in connection.iter() {
+ loop {
+ let notification = event_loop.poll().await;
match notification? {
Incoming(Packet::Publish(p)) => {
for (topic, programs) in conf.routes.iter() {
@@ -92,7 +94,6 @@ fn main() -> anyhow::Result<()> {
_ => (),
}
}
- Ok(())
}
#[cfg(test)]