diff --git a/CHANGES.md b/CHANGES.md index 346ab99c..444fbf42 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,6 +6,10 @@ * v5: Handle 16 concurrent control service requests +* v3: Handle packet id in use for subscribe and unsubscribe packets + +* v3: Handle 16 concurrent control service requests + ## [0.3.1] - 2020-08-12 * v5: Fix max inflight check diff --git a/Cargo.toml b/Cargo.toml index 64fa23b8..ee5d7c14 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-mqtt" -version = "0.3.1" +version = "0.3.2" authors = ["ntex contributors "] description = "MQTT Client/Server framework for v5 and v3.1.1 protocols" documentation = "https://docs.rs/ntex-mqtt" diff --git a/src/v3/dispatcher.rs b/src/v3/dispatcher.rs index 858ba42f..44c42b72 100644 --- a/src/v3/dispatcher.rs +++ b/src/v3/dispatcher.rs @@ -6,7 +6,6 @@ use futures::future::{err, join, ok, Either, FutureExt, Ready}; use futures::ready; use fxhash::FxHashSet; use ntex::service::{fn_factory_with_config, Service, ServiceFactory}; -use ntex::util::buffer::BufferService; use ntex::util::inflight::InFlightService; use ntex::util::order::{InOrder, InOrderError}; @@ -65,11 +64,13 @@ where InOrderError::Disconnected => MqttError::Disconnected, }), ), - BufferService::new( + // limit number of in-flight control messages + InFlightService::new( 16, - || MqttError::Disconnected, - // limit number of in-flight messages - InFlightService::new(1, control?), + InOrder::service(control?).map_err(|e| match e { + InOrderError::Service(e) => e, + InOrderError::Disconnected => MqttError::Disconnected, + }), ), )) } @@ -145,6 +146,7 @@ where // check for duplicated packet id if let Some(pid) = packet_id { if !inflight.borrow_mut().insert(pid) { + log::trace!("Duplicated packet id for publish packet: {:?}", pid); return Either::Right(Either::Left(err(MqttError::Protocol( ProtocolError::DuplicatedPacketId, )))); @@ -161,27 +163,45 @@ where self.session.sink().complete_publish_qos1(packet_id); Either::Right(Either::Left(ok(None))) } - codec::Packet::PingRequest => Either::Right(Either::Right(ControlResponse { - fut: self.control.call(ControlPacket::ping()), - })), - codec::Packet::Disconnect => Either::Right(Either::Right(ControlResponse { - fut: self.control.call(ControlPacket::disconnect()), - })), + codec::Packet::PingRequest => Either::Right(Either::Right(ControlResponse::new( + self.control.call(ControlPacket::ping()), + &self.inflight, + ))), + codec::Packet::Disconnect => Either::Right(Either::Right(ControlResponse::new( + self.control.call(ControlPacket::disconnect()), + &self.inflight, + ))), codec::Packet::Subscribe { packet_id, topic_filters } => { - Either::Right(Either::Right(ControlResponse { - fut: self.control.call(ControlPacket::Subscribe(Subscribe::new( + if !self.inflight.borrow_mut().insert(packet_id) { + log::trace!("Duplicated packet id for unsubscribe packet: {:?}", packet_id); + return Either::Right(Either::Left(err(MqttError::Protocol( + ProtocolError::DuplicatedPacketId, + )))); + } + + Either::Right(Either::Right(ControlResponse::new( + self.control.call(ControlPacket::Subscribe(Subscribe::new( packet_id, topic_filters, ))), - })) + &self.inflight, + ))) } codec::Packet::Unsubscribe { packet_id, topic_filters } => { - Either::Right(Either::Right(ControlResponse { - fut: self.control.call(ControlPacket::Unsubscribe(Unsubscribe::new( + if !self.inflight.borrow_mut().insert(packet_id) { + log::trace!("Duplicated packet id for unsubscribe packet: {:?}", packet_id); + return Either::Right(Either::Left(err(MqttError::Protocol( + ProtocolError::DuplicatedPacketId, + )))); + } + + Either::Right(Either::Right(ControlResponse::new( + self.control.call(ControlPacket::Unsubscribe(Unsubscribe::new( packet_id, topic_filters, ))), - })) + &self.inflight, + ))) } _ => Either::Right(Either::Left(ok(None))), } @@ -228,6 +248,16 @@ pin_project_lite::pin_project! { { #[pin] fut: T, + inflight: Rc>>, + } +} + +impl ControlResponse +where + T: Future>>, +{ + fn new(fut: T, inflight: &Rc>>) -> Self { + Self { fut, inflight: inflight.clone() } } } @@ -238,13 +268,19 @@ where type Output = Result, MqttError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let packet = match ready!(self.project().fut.poll(cx))?.result { + let this = self.project(); + + let packet = match ready!(this.fut.poll(cx))?.result { ControlResultKind::Ping => Some(codec::Packet::PingResponse), - ControlResultKind::Subscribe(res) => Some(codec::Packet::SubscribeAck { - status: res.codes, - packet_id: res.packet_id, - }), + ControlResultKind::Subscribe(res) => { + this.inflight.borrow_mut().remove(&res.packet_id); + Some(codec::Packet::SubscribeAck { + status: res.codes, + packet_id: res.packet_id, + }) + } ControlResultKind::Unsubscribe(res) => { + this.inflight.borrow_mut().remove(&res.packet_id); Some(codec::Packet::UnsubscribeAck { packet_id: res.packet_id }) } ControlResultKind::Disconnect => None, diff --git a/src/v5/dispatcher.rs b/src/v5/dispatcher.rs index bfc93d7a..20c0e78c 100644 --- a/src/v5/dispatcher.rs +++ b/src/v5/dispatcher.rs @@ -68,16 +68,16 @@ where io::Error::new(io::ErrorKind::Other, "Service dropped"), )), }), - InOrder::service( - // limit number of in-flight control messages - InFlightService::new(16, control?), - ) - .map_err(|e| match e { - InOrderError::Service(e) => either::Either::Left(e), - InOrderError::Disconnected => either::Either::Right(ProtocolError::Io( - io::Error::new(io::ErrorKind::Other, "Service dropped"), - )), - }), + // limit number of in-flight control messages + InFlightService::new( + 16, + InOrder::service(control?).map_err(|e| match e { + InOrderError::Service(e) => either::Either::Left(e), + InOrderError::Disconnected => either::Either::Right(ProtocolError::Io( + io::Error::new(io::ErrorKind::Other, "Service dropped"), + )), + }), + ), )) } })