Skip to content

Commit

Permalink
Handle packet id in use for subscribe and unsubscribe packets for v3
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Aug 13, 2020
1 parent 02ea426 commit 4f8d85f
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 33 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

* v5: Handle 16 concurrent control service requests

* v3: Handle packet id in use for subscribe and unsubscribe packets

* v3: Handle 16 concurrent control service requests

## [0.3.1] - 2020-08-12

* v5: Fix max inflight check
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "0.3.1"
version = "0.3.2"
authors = ["ntex contributors <[email protected]>"]
description = "MQTT Client/Server framework for v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
Expand Down
80 changes: 58 additions & 22 deletions src/v3/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use futures::future::{err, join, ok, Either, FutureExt, Ready};
use futures::ready;
use fxhash::FxHashSet;
use ntex::service::{fn_factory_with_config, Service, ServiceFactory};
use ntex::util::buffer::BufferService;
use ntex::util::inflight::InFlightService;
use ntex::util::order::{InOrder, InOrderError};

Expand Down Expand Up @@ -65,11 +64,13 @@ where
InOrderError::Disconnected => MqttError::Disconnected,
}),
),
BufferService::new(
// limit number of in-flight control messages
InFlightService::new(
16,
|| MqttError::Disconnected,
// limit number of in-flight messages
InFlightService::new(1, control?),
InOrder::service(control?).map_err(|e| match e {
InOrderError::Service(e) => e,
InOrderError::Disconnected => MqttError::Disconnected,
}),
),
))
}
Expand Down Expand Up @@ -145,6 +146,7 @@ where
// check for duplicated packet id
if let Some(pid) = packet_id {
if !inflight.borrow_mut().insert(pid) {
log::trace!("Duplicated packet id for publish packet: {:?}", pid);
return Either::Right(Either::Left(err(MqttError::Protocol(
ProtocolError::DuplicatedPacketId,
))));
Expand All @@ -161,27 +163,45 @@ where
self.session.sink().complete_publish_qos1(packet_id);
Either::Right(Either::Left(ok(None)))
}
codec::Packet::PingRequest => Either::Right(Either::Right(ControlResponse {
fut: self.control.call(ControlPacket::ping()),
})),
codec::Packet::Disconnect => Either::Right(Either::Right(ControlResponse {
fut: self.control.call(ControlPacket::disconnect()),
})),
codec::Packet::PingRequest => Either::Right(Either::Right(ControlResponse::new(
self.control.call(ControlPacket::ping()),
&self.inflight,
))),
codec::Packet::Disconnect => Either::Right(Either::Right(ControlResponse::new(
self.control.call(ControlPacket::disconnect()),
&self.inflight,
))),
codec::Packet::Subscribe { packet_id, topic_filters } => {
Either::Right(Either::Right(ControlResponse {
fut: self.control.call(ControlPacket::Subscribe(Subscribe::new(
if !self.inflight.borrow_mut().insert(packet_id) {
log::trace!("Duplicated packet id for unsubscribe packet: {:?}", packet_id);
return Either::Right(Either::Left(err(MqttError::Protocol(
ProtocolError::DuplicatedPacketId,
))));
}

Either::Right(Either::Right(ControlResponse::new(
self.control.call(ControlPacket::Subscribe(Subscribe::new(
packet_id,
topic_filters,
))),
}))
&self.inflight,
)))
}
codec::Packet::Unsubscribe { packet_id, topic_filters } => {
Either::Right(Either::Right(ControlResponse {
fut: self.control.call(ControlPacket::Unsubscribe(Unsubscribe::new(
if !self.inflight.borrow_mut().insert(packet_id) {
log::trace!("Duplicated packet id for unsubscribe packet: {:?}", packet_id);
return Either::Right(Either::Left(err(MqttError::Protocol(
ProtocolError::DuplicatedPacketId,
))));
}

Either::Right(Either::Right(ControlResponse::new(
self.control.call(ControlPacket::Unsubscribe(Unsubscribe::new(
packet_id,
topic_filters,
))),
}))
&self.inflight,
)))
}
_ => Either::Right(Either::Left(ok(None))),
}
Expand Down Expand Up @@ -228,6 +248,16 @@ pin_project_lite::pin_project! {
{
#[pin]
fut: T,
inflight: Rc<RefCell<FxHashSet<NonZeroU16>>>,
}
}

impl<T, E> ControlResponse<T, E>
where
T: Future<Output = Result<ControlResult, MqttError<E>>>,
{
fn new(fut: T, inflight: &Rc<RefCell<FxHashSet<NonZeroU16>>>) -> Self {
Self { fut, inflight: inflight.clone() }
}
}

Expand All @@ -238,13 +268,19 @@ where
type Output = Result<Option<codec::Packet>, MqttError<E>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let packet = match ready!(self.project().fut.poll(cx))?.result {
let this = self.project();

let packet = match ready!(this.fut.poll(cx))?.result {
ControlResultKind::Ping => Some(codec::Packet::PingResponse),
ControlResultKind::Subscribe(res) => Some(codec::Packet::SubscribeAck {
status: res.codes,
packet_id: res.packet_id,
}),
ControlResultKind::Subscribe(res) => {
this.inflight.borrow_mut().remove(&res.packet_id);
Some(codec::Packet::SubscribeAck {
status: res.codes,
packet_id: res.packet_id,
})
}
ControlResultKind::Unsubscribe(res) => {
this.inflight.borrow_mut().remove(&res.packet_id);
Some(codec::Packet::UnsubscribeAck { packet_id: res.packet_id })
}
ControlResultKind::Disconnect => None,
Expand Down
20 changes: 10 additions & 10 deletions src/v5/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,16 @@ where
io::Error::new(io::ErrorKind::Other, "Service dropped"),
)),
}),
InOrder::service(
// limit number of in-flight control messages
InFlightService::new(16, control?),
)
.map_err(|e| match e {
InOrderError::Service(e) => either::Either::Left(e),
InOrderError::Disconnected => either::Either::Right(ProtocolError::Io(
io::Error::new(io::ErrorKind::Other, "Service dropped"),
)),
}),
// limit number of in-flight control messages
InFlightService::new(
16,
InOrder::service(control?).map_err(|e| match e {
InOrderError::Service(e) => either::Either::Left(e),
InOrderError::Disconnected => either::Either::Right(ProtocolError::Io(
io::Error::new(io::ErrorKind::Other, "Service dropped"),
)),
}),
),
))
}
})
Expand Down

0 comments on commit 4f8d85f

Please sign in to comment.