From 1fcd7d38efbcc3a08d8ebd13f8e349b4a4c515fa Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 1 Oct 2021 22:10:01 +0600 Subject: [PATCH] Serialize control message handling --- CHANGES.md | 4 +++ Cargo.toml | 2 +- src/v3/client/dispatcher.rs | 35 +++++++++++-------- src/v3/dispatcher.rs | 40 +++++++++++++++------- src/v3/server.rs | 2 +- src/v5/client/dispatcher.rs | 68 +++++++++++++++++-------------------- src/v5/dispatcher.rs | 44 ++++++++++++++++-------- src/v5/server.rs | 5 ++- 8 files changed, 119 insertions(+), 81 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 285bee1b..9974ff57 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.7.2] - 2021-10-01 + +* Serialize control message handling + ## [0.7.1] - 2021-09-18 * Allow to extract error from control message diff --git a/Cargo.toml b/Cargo.toml index 84e248d1..74104138 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-mqtt" -version = "0.7.1" +version = "0.7.2" authors = ["ntex contributors "] description = "Client and Server framework for MQTT v5 and v3.1.1 protocols" documentation = "https://docs.rs/ntex-mqtt" diff --git a/src/v3/client/dispatcher.rs b/src/v3/client/dispatcher.rs index 1367b433..fca1591e 100644 --- a/src/v3/client/dispatcher.rs +++ b/src/v3/client/dispatcher.rs @@ -3,7 +3,7 @@ use std::task::{Context, Poll}; use std::{future::Future, marker::PhantomData, num::NonZeroU16, pin::Pin, rc::Rc}; use ntex::service::Service; -use ntex::util::{inflight::InFlightService, Either, HashSet, Ready}; +use ntex::util::{buffer::BufferService, inflight::InFlightService, Either, HashSet, Ready}; use crate::v3::shared::{Ack, MqttShared}; use crate::v3::{codec, control::ControlResultKind, publish::Publish, sink::MqttSink}; @@ -24,12 +24,19 @@ pub(super) fn create_dispatcher( > where E: 'static, - T: Service, Error = E> - + 'static, + T: Service, Error = E> + 'static, C: Service, Response = ControlResult, Error = E> + 'static, { + // limit inflight control messages + let control = BufferService::new( + 16, + || MqttError::::Disconnected, + // limit number of in-flight messages + InFlightService::new(1, control.map_err(MqttError::Service)), + ); + // limit number of in-flight messages - InFlightService::new(inflight, Dispatcher::::new(sink, publish, control)) + InFlightService::new(inflight, Dispatcher::new(sink, publish, control)) } /// Mqtt protocol dispatcher @@ -49,8 +56,8 @@ struct Inner { impl Dispatcher where - T: Service, Error = E>, - C: Service, Response = ControlResult, Error = E>, + T: Service, Error = E>, + C: Service, Response = ControlResult, Error = MqttError>, { pub(crate) fn new(sink: MqttSink, publish: T, control: C) -> Self { Self { @@ -65,8 +72,8 @@ where impl Service for Dispatcher where - T: Service, Error = E>, - C: Service, Response = ControlResult, Error = E>, + T: Service, Error = E>, + C: Service, Response = ControlResult, Error = MqttError>, C::Future: 'static, E: 'static, { @@ -80,7 +87,7 @@ where fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { let res1 = self.publish.poll_ready(cx).map_err(MqttError::Service)?; - let res2 = self.inner.control.poll_ready(cx).map_err(MqttError::Service)?; + let res2 = self.inner.control.poll_ready(cx)?; if res1.is_pending() || res2.is_pending() { Poll::Pending @@ -218,8 +225,8 @@ pin_project_lite::pin_project! { impl Future for PublishResponse where - T: Service, Error = E>, - C: Service, Response = ControlResult, Error = E>, + T: Service, Error = E>, + C: Service, Response = ControlResult, Error = MqttError>, { type Output = Result, MqttError>; @@ -273,7 +280,7 @@ pin_project_lite::pin_project! { impl ControlResponse where - C: Service, Response = ControlResult, Error = E>, + C: Service, Response = ControlResult, Error = MqttError>, { fn new(msg: ControlMessage, inner: &Rc>) -> Self { Self { fut: inner.control.call(msg), inner: inner.clone(), _t: PhantomData } @@ -282,14 +289,14 @@ where impl Future for ControlResponse where - C: Service, Response = ControlResult, Error = E>, + C: Service, Response = ControlResult, Error = MqttError>, { type Output = Result, MqttError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); - let packet = match this.fut.poll(cx).map_err(MqttError::Service)? { + let packet = match this.fut.poll(cx)? { Poll::Ready(item) => match item.result { ControlResultKind::Ping => Some(codec::Packet::PingResponse), ControlResultKind::PublishAck(id) => { diff --git a/src/v3/dispatcher.rs b/src/v3/dispatcher.rs index fe712a07..ffcd6435 100644 --- a/src/v3/dispatcher.rs +++ b/src/v3/dispatcher.rs @@ -3,7 +3,9 @@ use std::task::{Context, Poll}; use std::{future::Future, marker::PhantomData, num::NonZeroU16, pin::Pin, rc::Rc}; use ntex::service::{fn_factory_with_config, Service, ServiceFactory}; -use ntex::util::{inflight::InFlightService, join, Either, HashSet, Ready}; +use ntex::util::{ + buffer::BufferService, inflight::InFlightService, join, Either, HashSet, Ready, +}; use crate::error::{MqttError, ProtocolError}; use crate::io::DispatchItem; @@ -51,11 +53,18 @@ where async move { let (publish, control) = fut.await; + let control = BufferService::new( + 16, + || MqttError::::Disconnected, + // limit number of in-flight messages + InFlightService::new(1, control?.map_err(MqttError::Service)), + ); + Ok( // limit number of in-flight messages InFlightService::new( inflight, - Dispatcher::<_, _, _, E>::new(cfg, publish?, control?), + Dispatcher::<_, _, _, E>::new(cfg, publish?, control), ), ) } @@ -80,7 +89,7 @@ struct Inner { impl Dispatcher where T: Service, - C: Service, Response = ControlResult, Error = E>, + C: Service, Response = ControlResult, Error = MqttError>, { pub(crate) fn new(session: Session, publish: T, control: C) -> Self { let sink = session.sink().clone(); @@ -98,7 +107,7 @@ where impl Service for Dispatcher where T: Service, - C: Service, Response = ControlResult, Error = E>, + C: Service, Response = ControlResult, Error = MqttError>, C::Future: 'static, E: 'static, { @@ -112,7 +121,7 @@ where fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { let res1 = self.publish.poll_ready(cx).map_err(MqttError::Service)?; - let res2 = self.inner.control.poll_ready(cx).map_err(MqttError::Service)?; + let res2 = self.inner.control.poll_ready(cx)?; if res1.is_pending() || res2.is_pending() { Poll::Pending @@ -252,7 +261,7 @@ pin_project_lite::pin_project! { impl Future for PublishResponse where T: Service, - C: Service, Response = ControlResult, Error = E>, + C: Service, Response = ControlResult, Error = MqttError>, { type Output = Result, MqttError>; @@ -300,7 +309,7 @@ pin_project_lite::pin_project! { impl ControlResponse where - C: Service, Response = ControlResult, Error = E>, + C: Service, Response = ControlResult, Error = MqttError>, { #[allow(clippy::match_like_matches_macro)] fn new(pkt: ControlMessage, inner: &Rc>) -> Self { @@ -315,7 +324,7 @@ where impl Future for ControlResponse where - C: Service, Response = ControlResult, Error = E>, + C: Service, Response = ControlResult, Error = MqttError>, { type Output = Result, MqttError>; @@ -350,13 +359,18 @@ where Poll::Ready(Err(err)) => { // do not handle nested error if *this.error { - Poll::Ready(Err(MqttError::Service(err))) + Poll::Ready(Err(err)) } else { // handle error from control service - *this.error = true; - let fut = this.inner.control.call(ControlMessage::error(err)); - self.as_mut().project().fut.set(fut); - self.poll(cx) + match err { + MqttError::Service(err) => { + *this.error = true; + let fut = this.inner.control.call(ControlMessage::error(err)); + self.as_mut().project().fut.set(fut); + self.poll(cx) + } + _ => Poll::Ready(Err(err)), + } } } Poll::Pending => Poll::Pending, diff --git a/src/v3/server.rs b/src/v3/server.rs index 34fb4452..8db52d0a 100644 --- a/src/v3/server.rs +++ b/src/v3/server.rs @@ -123,7 +123,7 @@ where /// Service to handle control packets /// - /// All control packets are processed sequentially, max buffered + /// All control packets are processed sequentially, max number of buffered /// control packets is 16. pub fn control(self, service: F) -> MqttServer where diff --git a/src/v5/client/dispatcher.rs b/src/v5/client/dispatcher.rs index 55f3000b..982b32e7 100644 --- a/src/v5/client/dispatcher.rs +++ b/src/v5/client/dispatcher.rs @@ -3,7 +3,7 @@ use std::task::{Context, Poll}; use std::{future::Future, marker::PhantomData, num::NonZeroU16, pin::Pin, rc::Rc}; use ntex::service::Service; -use ntex::util::{Either, HashSet, Ready}; +use ntex::util::{buffer::BufferService, inflight::InFlightService, Either, HashSet, Ready}; use crate::error::{MqttError, ProtocolError}; use crate::v5::shared::{Ack, MqttShared}; @@ -26,13 +26,16 @@ pub(super) fn create_dispatcher( > where E: From + 'static, - T: Service< - Request = Publish, - Response = ntex::util::Either, - Error = E, - > + 'static, + T: Service, Error = E> + 'static, C: Service, Response = ControlResult, Error = E> + 'static, { + let control = BufferService::new( + 16, + || MqttError::::Disconnected, + // limit number of in-flight messages + InFlightService::new(1, control.map_err(MqttError::Service)), + ); + Dispatcher::<_, _, E>::new(sink, max_receive as usize, max_topic_alias, publish, control) } @@ -59,12 +62,8 @@ struct PublishInfo { impl Dispatcher where - T: Service< - Request = Publish, - Response = ntex::util::Either, - Error = E, - >, - C: Service, Response = ControlResult, Error = E>, + T: Service, Error = E>, + C: Service, Response = ControlResult, Error = MqttError>, { fn new( sink: MqttSink, @@ -93,12 +92,8 @@ where impl Service for Dispatcher where - T: Service< - Request = Publish, - Response = ntex::util::Either, - Error = E, - >, - C: Service, Response = ControlResult, Error = E>, + T: Service, Error = E>, + C: Service, Response = ControlResult, Error = MqttError>, C::Future: 'static, { type Request = DispatchItem>; @@ -111,7 +106,7 @@ where fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { let res1 = self.publish.poll_ready(cx).map_err(MqttError::Service)?; - let res2 = self.inner.control.poll_ready(cx).map_err(MqttError::Service)?; + let res2 = self.inner.control.poll_ready(cx)?; if res1.is_pending() || res2.is_pending() { Poll::Pending @@ -327,12 +322,8 @@ pin_project_lite::pin_project! { impl Future for PublishResponse where - T: Service< - Request = Publish, - Response = ntex::util::Either, - Error = E, - >, - C: Service, Response = ControlResult, Error = E>, + T: Service, Error = E>, + C: Service, Response = ControlResult, Error = MqttError>, { type Output = Result, MqttError>; @@ -343,8 +334,8 @@ where PublishResponseStateProject::Publish { fut } => { let ack = match fut.poll(cx) { Poll::Ready(Ok(res)) => match res { - ntex::util::Either::Right(ack) => ack, - ntex::util::Either::Left(pkt) => { + Either::Right(ack) => ack, + Either::Left(pkt) => { this.state.set(PublishResponseState::Control { fut: ControlResponse::new( ControlMessage::publish(pkt.into_inner()), @@ -397,7 +388,7 @@ pin_project_lite::pin_project! { impl ControlResponse where - C: Service, Response = ControlResult, Error = E>, + C: Service, Response = ControlResult, Error = MqttError>, { #[allow(clippy::match_like_matches_macro)] fn new(pkt: ControlMessage, inner: &Rc>) -> Self { @@ -423,7 +414,7 @@ where impl Future for ControlResponse where - C: Service, Response = ControlResult, Error = E>, + C: Service, Response = ControlResult, Error = MqttError>, { type Output = Result, MqttError>; @@ -439,15 +430,20 @@ where } Poll::Ready(Err(err)) => { // do not handle nested error - if *this.error { - return Poll::Ready(Err(MqttError::Service(err))); + return if *this.error { + Poll::Ready(Err(err)) } else { // handle error from control service - *this.error = true; - let fut = this.inner.control.call(ControlMessage::error(err)); - self.as_mut().project().fut.set(fut); - return self.poll(cx); - } + match err { + MqttError::Service(err) => { + *this.error = true; + let fut = this.inner.control.call(ControlMessage::error(err)); + self.as_mut().project().fut.set(fut); + self.poll(cx) + } + _ => Poll::Ready(Err(err)), + } + }; } Poll::Pending => return Poll::Pending, }; diff --git a/src/v5/dispatcher.rs b/src/v5/dispatcher.rs index 6eb544fd..e6a396bb 100644 --- a/src/v5/dispatcher.rs +++ b/src/v5/dispatcher.rs @@ -3,7 +3,9 @@ use std::task::{Context, Poll}; use std::{convert::TryFrom, future::Future, marker, num, pin::Pin, rc::Rc}; use ntex::service::{fn_factory_with_config, Service, ServiceFactory}; -use ntex::util::{join, Either, HashSet, Ready}; +use ntex::util::{ + buffer::BufferService, inflight::InFlightService, join, Either, HashSet, Ready, +}; use crate::error::{MqttError, ProtocolError}; use crate::io::DispatchItem; @@ -52,12 +54,19 @@ where async move { let (publish, control) = fut.await; + let control = BufferService::new( + 16, + || MqttError::::Disconnected, + // limit number of in-flight messages + InFlightService::new(1, control?.map_err(MqttError::Service)), + ); + Ok(Dispatcher::<_, _, E, T::Error>::new( cfg.sink().clone(), max_receive as usize, max_topic_alias, publish?, - control?, + control, )) } }) @@ -89,7 +98,7 @@ impl Dispatcher where T: Service, PublishAck: TryFrom, - C: Service, Response = ControlResult, Error = E>, + C: Service, Response = ControlResult, Error = MqttError>, { fn new( sink: MqttSink, @@ -121,7 +130,7 @@ impl Service for Dispatcher where T: Service, PublishAck: TryFrom, - C: Service, Response = ControlResult, Error = E>, + C: Service, Response = ControlResult, Error = MqttError>, C::Future: 'static, E: From + 'static, { @@ -135,7 +144,7 @@ where fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { let res1 = self.publish.poll_ready(cx).map_err(|e| MqttError::Service(e.into()))?; - let res2 = self.inner.control.poll_ready(cx).map_err(MqttError::Service)?; + let res2 = self.inner.control.poll_ready(cx)?; if res1.is_pending() || res2.is_pending() { Poll::Pending @@ -346,7 +355,7 @@ where E: From, T: Service, PublishAck: TryFrom, - C: Service, Response = ControlResult, Error = E>, + C: Service, Response = ControlResult, Error = MqttError>, { type Output = Result, MqttError>; @@ -416,7 +425,7 @@ pin_project_lite::pin_project! { impl ControlResponse where - C: Service, Response = ControlResult, Error = E>, + C: Service, Response = ControlResult, Error = MqttError>, { #[allow(clippy::match_like_matches_macro)] fn new(pkt: ControlMessage, inner: &Rc>) -> Self { @@ -442,7 +451,7 @@ where impl Future for ControlResponse where - C: Service, Response = ControlResult, Error = E>, + C: Service, Response = ControlResult, Error = MqttError>, { type Output = Result, MqttError>; @@ -458,15 +467,20 @@ where } Poll::Ready(Err(err)) => { // do not handle nested error - if *this.error { - return Poll::Ready(Err(MqttError::Service(err))); + return if *this.error { + Poll::Ready(Err(err)) } else { // handle error from control service - *this.error = true; - let fut = this.inner.control.call(ControlMessage::error(err)); - self.as_mut().project().fut.set(fut); - return self.poll(cx); - } + match err { + MqttError::Service(err) => { + *this.error = true; + let fut = this.inner.control.call(ControlMessage::error(err)); + self.as_mut().project().fut.set(fut); + self.poll(cx) + } + _ => Poll::Ready(Err(err)), + } + }; } Poll::Pending => return Poll::Pending, }; diff --git a/src/v5/server.rs b/src/v5/server.rs index 730146bc..d9fd35ca 100644 --- a/src/v5/server.rs +++ b/src/v5/server.rs @@ -141,7 +141,10 @@ where self } - /// Service to handle control messages + /// Service to handle control packets + /// + /// All control packets are processed sequentially, max number of buffered + /// control packets is 16. pub fn control(self, service: F) -> MqttServer where F: IntoServiceFactory,