diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/config.rs | 6 | ||||
| -rw-r--r-- | src/main.rs | 11 | 
2 files changed, 9 insertions, 8 deletions
| diff --git a/src/config.rs b/src/config.rs index 1190b9a..0f9cff3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,7 +3,7 @@  use std::{collections::HashMap, fs, path::Path, process, time::Duration}; -use rumqttc::{Client, Connection, MqttOptions}; +use rumqttc::{AsyncClient, EventLoop, MqttOptions};  use serde::Deserialize;  use crate::PROGRAM; @@ -41,7 +41,7 @@ pub struct Config {  }  impl Config { -    pub fn mqtt_client(&self) -> (Client, Connection) { +    pub fn mqtt_client(&self) -> (AsyncClient, EventLoop) {          let client_id = format!("{}_{}", self.id, process::id());          let mut options = MqttOptions::new(client_id, &self.host, self.port);          if let Some(credentials) = &self.credentials { @@ -49,7 +49,7 @@ impl Config {          }          options.set_keep_alive(Duration::from_secs(5));          options.set_max_packet_size(10 * 1024 * 1024, 10 * 1024 * 1024); -        Client::new(options, 10) +        AsyncClient::new(options, 10)      }  } diff --git a/src/main.rs b/src/main.rs index 14ccb7a..07288e5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -64,19 +64,21 @@ fn topic_match(filter: &str, topic: &str) -> bool {      }  } -fn main() -> anyhow::Result<()> { +#[tokio::main(flavor = "current_thread")] +async fn main() -> anyhow::Result<()> {      let mut conf_path: PathBuf = option_env!("SYSCONFDIR").unwrap_or("/usr/local/etc").into();      conf_path.push(format!("{PROGRAM}.toml"));      let conf = config::load(&conf_path)          .with_context(|| format!("Failed to load config: {:?}", &conf_path))?; -    let (client, mut connection) = conf.mqtt_client(); +    let (client, mut event_loop) = conf.mqtt_client();      for topic in conf.routes.keys() {          // TODO: Configurable subscription QoS -        if let Err(e) = client.subscribe(topic, QoS::AtMostOnce) { +        if let Err(e) = client.subscribe(topic, QoS::AtMostOnce).await {              eprintln!("warning: Failed to subscribe to '{topic}': {e:?}");          }      } -    for notification in connection.iter() { +    loop { +        let notification = event_loop.poll().await;          match notification? {              Incoming(Packet::Publish(p)) => {                  for (topic, programs) in conf.routes.iter() { @@ -92,7 +94,6 @@ fn main() -> anyhow::Result<()> {              _ => (),          }      } -    Ok(())  }  #[cfg(test)] | 
