Skip to content

Commit

Permalink
Handle packet id in use for subscribe and unsubscribe packets
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Aug 13, 2020
1 parent 567a0f7 commit 02ea426
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 67 deletions.
4 changes: 3 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

## [0.3.2] - 2020-08-13

* v5: Respond with PublishAck for PacketIdentifierInUse error instead of disconnect
* v5: Handle packet id in use for publish, subscribe and unsubscribe packets

* v5: Handle 16 concurrent control service requests

## [0.3.1] - 2020-08-12

Expand Down
195 changes: 129 additions & 66 deletions src/v5/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use futures::future::{join, ok, Either, FutureExt, Ready};
use futures::ready;
use fxhash::FxHashSet;
use ntex::service::{fn_factory_with_config, Service, ServiceFactory};
use ntex::util::inflight::InFlightService;
use ntex::util::order::{InOrder, InOrderError};
use ntex::util::{buffer::BufferService, inflight::InFlightService};

use crate::error::{MqttError, ProtocolError};
use crate::framed::DispatcherError;
Expand Down Expand Up @@ -56,29 +56,28 @@ where
async move {
let (publish, control) = fut.await;

// mqtt dispatcher
// mqtt dispatcher.
// mqtt spec requires ack ordering, so enforce response ordering
Ok(Dispatcher::<_, _, _, E, T::Error>::new(
cfg,
max_receive as usize,
max_topic_alias,
// mqtt spec requires ack ordering, so enforce response ordering
InOrder::service(publish?).map_err(|e| match e {
InOrderError::Service(e) => either::Either::Left(e),
InOrderError::Disconnected => either::Either::Right(ProtocolError::Io(
io::Error::new(io::ErrorKind::Other, "Service dropped"),
)),
}),
BufferService::new(
16,
|| {
either::Either::Right(ProtocolError::Io(io::Error::new(
io::ErrorKind::Other,
"Service dropped",
)))
},
// limit number of in-flight messages
InFlightService::new(1, control?.map_err(either::Either::Left)),
),
InOrder::service(
// limit number of in-flight control messages
InFlightService::new(16, control?),
)
.map_err(|e| match e {
InOrderError::Service(e) => either::Either::Left(e),
InOrderError::Disconnected => either::Either::Right(ProtocolError::Io(
io::Error::new(io::ErrorKind::Other, "Service dropped"),
)),
}),
))
}
})
Expand Down Expand Up @@ -211,13 +210,15 @@ where
self.max_receive,
inner.inflight.len()
);
return Either::Right(Either::Right(ControlResponse::new(
ControlPacket::proto_error(
ProtocolError::ReceiveMaximumExceeded,
),
&self.inner,
true,
)));
return Either::Right(Either::Right(
ControlResponse::new(
ControlPacket::proto_error(
ProtocolError::ReceiveMaximumExceeded,
),
&self.inner,
)
.error(),
));
}

// check for duplicated packet id
Expand All @@ -237,21 +238,27 @@ where
// check existing topic
if publish.topic.is_empty() {
if !inner.aliases.contains(&alias) {
return Either::Right(Either::Right(ControlResponse::new(
ControlPacket::proto_error(
ProtocolError::UnknownTopicAlias,
),
&self.inner,
true,
)));
return Either::Right(Either::Right(
ControlResponse::new(
ControlPacket::proto_error(
ProtocolError::UnknownTopicAlias,
),
&self.inner,
)
.error(),
));
}
} else {
if alias.get() > self.max_topic_alias {
return Either::Right(Either::Right(ControlResponse::new(
ControlPacket::proto_error(ProtocolError::MaxTopicAlias),
&self.inner,
true,
)));
return Either::Right(Either::Right(
ControlResponse::new(
ControlPacket::proto_error(
ProtocolError::MaxTopicAlias,
),
&self.inner,
)
.error(),
));
}

// record new alias
Expand All @@ -261,7 +268,7 @@ where
}

Either::Left(PublishResponse {
packet_id,
packet_id: packet_id.map(|v| v.get()).unwrap_or(0),
inner: info,
state: PublishResponseState::Publish(
self.publish.call(Publish::new(publish)),
Expand All @@ -271,37 +278,77 @@ where
}
Ok(codec::Packet::PublishAck(packet)) => {
if !self.session.sink().complete_publish_qos1(packet.packet_id) {
Either::Right(Either::Right(ControlResponse::new(
ControlPacket::proto_error(ProtocolError::PacketIdMismatch),
&self.inner,
true,
)))
Either::Right(Either::Right(
ControlResponse::new(
ControlPacket::proto_error(ProtocolError::PacketIdMismatch),
&self.inner,
)
.error(),
))
} else {
Either::Right(Either::Left(ok(None)))
}
}
Ok(codec::Packet::Auth(pkt)) => Either::Right(Either::Right(ControlResponse::new(
ControlPacket::auth(pkt),
&self.inner,
false,
))),
Ok(codec::Packet::PingRequest) => Either::Right(Either::Right(
ControlResponse::new(ControlPacket::ping(), &self.inner, false),
ControlResponse::new(ControlPacket::ping(), &self.inner),
)),
Ok(codec::Packet::Disconnect(pkt)) => Either::Right(Either::Right(
ControlResponse::new(ControlPacket::dis(pkt), &self.inner, false),
)),
Ok(codec::Packet::Subscribe(pkt)) => Either::Right(Either::Right(
ControlResponse::new(control::Subscribe::create(pkt), &self.inner, false),
)),
Ok(codec::Packet::Unsubscribe(pkt)) => Either::Right(Either::Right(
ControlResponse::new(control::Unsubscribe::create(pkt), &self.inner, false),
ControlResponse::new(ControlPacket::dis(pkt), &self.inner),
)),
Ok(codec::Packet::Subscribe(pkt)) => {
// register inflight packet id
if !self.inner.info.borrow_mut().inflight.insert(pkt.packet_id) {
// duplicated packet id
return Either::Right(Either::Left(ok(Some(codec::Packet::SubscribeAck(
codec::SubscribeAck {
packet_id: pkt.packet_id,
status: pkt
.topic_filters
.iter()
.map(|_| codec::SubscribeAckReason::PacketIdentifierInUse)
.collect(),
properties: codec::UserProperties::new(),
reason_string: None,
},
)))));
}
let id = pkt.packet_id;
Either::Right(Either::Right(
ControlResponse::new(control::Subscribe::create(pkt), &self.inner)
.packet_id(id),
))
}
Ok(codec::Packet::Unsubscribe(pkt)) => {
// register inflight packet id
if !self.inner.info.borrow_mut().inflight.insert(pkt.packet_id) {
// duplicated packet id
return Either::Right(Either::Left(ok(Some(
codec::Packet::UnsubscribeAck(codec::UnsubscribeAck {
packet_id: pkt.packet_id,
status: pkt
.topic_filters
.iter()
.map(|_| codec::UnsubscribeAckReason::PacketIdentifierInUse)
.collect(),
properties: codec::UserProperties::new(),
reason_string: None,
}),
))));
}
let id = pkt.packet_id;
Either::Right(Either::Right(
ControlResponse::new(control::Unsubscribe::create(pkt), &self.inner)
.packet_id(id),
))
}
Ok(_) => Either::Right(Either::Left(ok(None))),
Err(e) => Either::Right(Either::Right(ControlResponse::new(
ControlPacket::proto_error(e.into()),
&self.inner,
false,
))),
}
}
Expand All @@ -312,7 +359,7 @@ pin_project_lite::pin_project! {
pub(crate) struct PublishResponse<T: Service, C: Service, E, E2> {
#[pin]
state: PublishResponseState<T, C, E>,
packet_id: Option<NonZeroU16>,
packet_id: u16,
inner: Rc<Inner<C>>,
_t: PhantomData<(E, E2)>,
}
Expand Down Expand Up @@ -352,31 +399,25 @@ where
Ok(ack) => ack,
Err(e) => {
this.state.set(PublishResponseState::Control(
ControlResponse::new(
ControlPacket::error(e),
this.inner,
true,
),
ControlResponse::new(ControlPacket::error(e), this.inner)
.error(),
));
return self.poll(cx);
}
},
either::Either::Right(e) => {
this.state.set(PublishResponseState::Control(
ControlResponse::new(
ControlPacket::proto_error(e),
this.inner,
true,
),
ControlResponse::new(ControlPacket::proto_error(e), this.inner)
.error(),
));
return self.poll(cx);
}
},
};
if let Some(packet_id) = this.packet_id {
this.inner.info.borrow_mut().inflight.remove(&packet_id);
if let Some(id) = NonZeroU16::new(*this.packet_id) {
this.inner.info.borrow_mut().inflight.remove(&id);
let ack = codec::PublishAck {
packet_id: *packet_id,
packet_id: id,
reason_code: ack.reason_code,
reason_string: ack.reason_string,
properties: ack.properties,
Expand All @@ -399,6 +440,7 @@ pin_project_lite::pin_project! {
fut: C::Future,
inner: Rc<Inner<C>>,
error: bool,
packet_id: u16,
_t: PhantomData<E>,
}
}
Expand All @@ -411,8 +453,24 @@ where
Error = either::Either<E, ProtocolError>,
>,
{
fn new(pkt: ControlPacket<E>, inner: &Rc<Inner<C>>, error: bool) -> Self {
Self { fut: inner.control.call(pkt), inner: inner.clone(), error, _t: PhantomData }
fn new(pkt: ControlPacket<E>, inner: &Rc<Inner<C>>) -> Self {
Self {
fut: inner.control.call(pkt),
inner: inner.clone(),
packet_id: 0,
error: false,
_t: PhantomData,
}
}

fn error(mut self) -> Self {
self.error = true;
self
}

fn packet_id(mut self, id: NonZeroU16) -> Self {
self.packet_id = id.get();
self
}
}

Expand All @@ -430,7 +488,12 @@ where
let this = self.as_mut().project();

let result = match ready!(this.fut.poll(cx)) {
Ok(result) => result,
Ok(result) => {
if let Some(id) = NonZeroU16::new(self.packet_id) {
self.inner.info.borrow_mut().inflight.remove(&id);
}
result
}
Err(err) => {
// do not handle nested error
if *this.error {
Expand Down

0 comments on commit 02ea426

Please sign in to comment.