diff --git a/CHANGES.md b/CHANGES.md index 2d529861..cb9e7b9b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.1.3] - 2020-05-26 + +* Check for duplicated in-flight packet ids + ## [0.1.2] - 2020-04-20 * Update ntex @@ -32,4 +36,4 @@ ## [0.1.0] - 2019-09-25 -* Initial release \ No newline at end of file +* Initial release diff --git a/Cargo.toml b/Cargo.toml index 8f7206fc..d87ba1e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-mqtt" -version = "0.1.2" +version = "0.1.3" authors = ["Nikolay Kim "] description = "MQTT v3.1.1 Client/Server framework" documentation = "https://docs.rs/ntex-mqtt" @@ -19,6 +19,7 @@ bytes = "0.5.4" derive_more = "0.99.5" either = "1.5.3" futures = "0.3.4" +fxhash = "0.2.1" pin-project = "0.4.8" log = "0.4" bytestring = "0.1.5" diff --git a/src/dispatcher.rs b/src/dispatcher.rs index afb6c2e7..78869aa9 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -7,8 +7,9 @@ use std::rc::Rc; use std::task::{Context, Poll}; use std::time::Duration; -use futures::future::{join3, ok, Either, FutureExt, LocalBoxFuture, Ready}; +use futures::future::{err, join3, ok, Either, FutureExt, LocalBoxFuture, Ready}; use futures::ready; +use fxhash::FxHashSet; use ntex::service::{boxed, fn_factory_with_config, pipeline, Service, ServiceFactory}; use ntex::util::inflight::InFlightService; use ntex::util::keepalive::KeepAliveService; @@ -75,7 +76,7 @@ where let (publish, subscribe, unsubscribe) = fut.await; // mqtt dispatcher - Ok(Dispatcher::new( + Ok(Dispatcher::<_, _, E>::new( cfg, // keep-alive connection pipeline(KeepAliveService::new(timeout, time, || { @@ -101,17 +102,18 @@ where } /// PUBLIS/SUBSCRIBER/UNSUBSCRIBER packets dispatcher -pub(crate) struct Dispatcher { +pub(crate) struct Dispatcher>, E> { session: Session, publish: T, subscribe: boxed::BoxService, unsubscribe: boxed::BoxService, disconnect: RefCell, bool)>>>, + inflight: Rc>>, } -impl Dispatcher +impl Dispatcher where - T: Service, + T: Service>, { pub(crate) fn new( session: Session, @@ -126,24 +128,25 @@ where subscribe, unsubscribe, disconnect: RefCell::new(disconnect), + inflight: Rc::new(RefCell::new(FxHashSet::default())), } } } -impl Service for Dispatcher +impl Service for Dispatcher where - T: Service, - T::Error: 'static, + T: Service>, + E: 'static, { type Request = mqtt::Packet; type Response = Option; - type Error = T::Error; + type Error = MqttError; type Future = Either< Either< - Ready>, - LocalBoxFuture<'static, Result>, + Ready>>, + LocalBoxFuture<'static, Result>>, >, - PublishResponse, + PublishResponse>, >; fn poll_ready(&self, cx: &mut Context) -> Poll> { @@ -173,9 +176,18 @@ where } mqtt::Packet::Disconnect => Either::Left(Either::Left(ok(None))), mqtt::Packet::Publish(publish) => { + let inflight = self.inflight.clone(); let packet_id = publish.packet_id; + + // check for duplicated packet id + if let Some(pid) = packet_id { + if !inflight.borrow_mut().insert(pid) { + return Either::Left(Either::Left(err(MqttError::DuplicatedPacketId))); + } + } Either::Right(PublishResponse { packet_id, + inflight, fut: self.publish.call(Publish::new(publish)), _t: PhantomData, }) @@ -214,6 +226,7 @@ pub(crate) struct PublishResponse { #[pin] fut: T, packet_id: Option, + inflight: Rc>>, _t: PhantomData, } @@ -228,6 +241,7 @@ where ready!(this.fut.poll(cx))?; if let Some(packet_id) = this.packet_id { + this.inflight.borrow_mut().remove(&packet_id); Poll::Ready(Ok(Some(mqtt::Packet::PublishAck { packet_id: *packet_id, }))) diff --git a/src/error.rs b/src/error.rs index d76a7a5b..4a7b8551 100644 --- a/src/error.rs +++ b/src/error.rs @@ -11,6 +11,8 @@ pub enum MqttError { Unexpected(crate::codec3::Packet, &'static str), /// "SUBSCRIBE, UNSUBSCRIBE, and PUBLISH (in cases where QoS > 0) Control Packets MUST contain a non-zero 16-bit Packet Identifier [MQTT-2.3.1-1]." PacketIdRequired, + /// Multiple in-flight publish packet with same package_id + DuplicatedPacketId, /// Keep alive timeout KeepAliveTimeout, /// Handshake timeout