Skip to content

Commit

Permalink
v3: add ControlMessage::Error and ControlMessage::ProtocolError
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Sep 7, 2021
1 parent 62bf2df commit 19fce07
Show file tree
Hide file tree
Showing 16 changed files with 407 additions and 375 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.7.0-b.10] - 2021-09-07

* v3: add ControlMessage::Error and ControlMessage::ProtocolError

## [0.7.0-b.9] - 2021-09-07

* v5: add helper methods to client control publish message
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "0.7.0-b.9"
version = "0.7.0-b.10"
authors = ["ntex contributors <[email protected]>"]
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
Expand Down
2 changes: 1 addition & 1 deletion src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ where
> + 'static,
Cn: ServiceFactory<
Config = v3::Session<St>,
Request = v3::ControlMessage,
Request = v3::ControlMessage<C::Error>,
Response = v3::ControlResult,
> + 'static,
P: ServiceFactory<Config = v3::Session<St>, Request = v3::Publish, Response = ()>
Expand Down
10 changes: 5 additions & 5 deletions src/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,21 @@ macro_rules! matches {

for rhs in $levels {
match lhs.next() {
Some(&Level::SingleWildcard) => {
if !rhs.match_level(&Level::SingleWildcard) {
Some(&$crate::topic::Level::SingleWildcard) => {
if !rhs.match_level(&$crate::topic::Level::SingleWildcard) {
break;
}
}
Some(&Level::MultiWildcard) => {
return rhs.match_level(&Level::MultiWildcard);
Some(&$crate::topic::Level::MultiWildcard) => {
return rhs.match_level(&$crate::topic::Level::MultiWildcard);
}
Some(level) if rhs.match_level(level) => continue,
_ => return false,
}
}

match lhs.next() {
Some(&Level::MultiWildcard) => true,
Some(&$crate::topic::Level::MultiWildcard) => true,
Some(_) => false,
None => true,
}
Expand Down
97 changes: 14 additions & 83 deletions src/v3/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,31 +100,14 @@ where
MqttSink::new(self.shared.clone()),
self.max_receive,
into_service(|pkt| Ready::Ok(Either::Right(pkt))),
into_service(|msg: ControlMessage| Ready::<_, MqttError<()>>::Ok(msg.disconnect())),
into_service(|msg: ControlMessage<()>| Ready::<_, ()>::Ok(msg.disconnect())),
);

let _ = Dispatcher::with(
self.io,
self.shared.state.clone(),
self.shared.clone(),
apply_fn(dispatcher, |req: DispatchItem<Rc<MqttShared>>, srv| match req {
DispatchItem::Item(req) => Either::Left(srv.call(req)),
DispatchItem::KeepAliveTimeout => Either::Right(Ready::Err(
MqttError::Protocol(ProtocolError::KeepAliveTimeout),
)),
DispatchItem::EncoderError(e) => {
Either::Right(Ready::Err(MqttError::Protocol(ProtocolError::Encode(e))))
}
DispatchItem::DecoderError(e) => {
Either::Right(Ready::Err(MqttError::Protocol(ProtocolError::Decode(e))))
}
DispatchItem::IoError(e) => {
Either::Right(Ready::Err(MqttError::Protocol(ProtocolError::Io(e))))
}
DispatchItem::WBackPressureEnabled | DispatchItem::WBackPressureDisabled => {
Either::Right(Ready::Ok(None))
}
}),
dispatcher,
Timer::new(Millis::ONE_SEC),
)
.keepalive_timeout(Seconds::ZERO)
Expand All @@ -137,7 +120,7 @@ where
where
E: 'static,
F: IntoService<S> + 'static,
S: Service<Request = ControlMessage, Response = ControlResult, Error = E> + 'static,
S: Service<Request = ControlMessage<E>, Response = ControlResult, Error = E> + 'static,
{
if self.keepalive.non_zero() {
ntex::rt::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
Expand All @@ -147,31 +130,14 @@ where
MqttSink::new(self.shared.clone()),
self.max_receive,
into_service(|pkt| Ready::Ok(Either::Right(pkt))),
service.into_service().map_err(MqttError::Service),
service.into_service(),
);

Dispatcher::with(
self.io,
self.shared.state.clone(),
self.shared.clone(),
apply_fn(dispatcher, |req: DispatchItem<Rc<MqttShared>>, srv| match req {
DispatchItem::Item(req) => Either::Left(srv.call(req)),
DispatchItem::KeepAliveTimeout => Either::Right(Ready::Err(
MqttError::Protocol(ProtocolError::KeepAliveTimeout),
)),
DispatchItem::EncoderError(e) => {
Either::Right(Ready::Err(MqttError::Protocol(ProtocolError::Encode(e))))
}
DispatchItem::DecoderError(e) => {
Either::Right(Ready::Err(MqttError::Protocol(ProtocolError::Decode(e))))
}
DispatchItem::IoError(e) => {
Either::Right(Ready::Err(MqttError::Protocol(ProtocolError::Io(e))))
}
DispatchItem::WBackPressureEnabled | DispatchItem::WBackPressureDisabled => {
Either::Right(Ready::Ok(None))
}
}),
dispatcher,
Timer::new(Millis::ONE_SEC),
)
.keepalive_timeout(Seconds::ZERO)
Expand Down Expand Up @@ -222,33 +188,14 @@ where
MqttSink::new(self.shared.clone()),
self.max_receive,
dispatch(self.builder.finish(), self.handlers),
into_service(|msg: ControlMessage| {
Ready::<_, MqttError<Err>>::Ok(msg.disconnect())
}),
into_service(|msg: ControlMessage<Err>| Ready::<_, Err>::Ok(msg.disconnect())),
);

let _ = Dispatcher::with(
self.io,
self.shared.state.clone(),
self.shared.clone(),
apply_fn(dispatcher, |req: DispatchItem<Rc<MqttShared>>, srv| match req {
DispatchItem::Item(req) => Either::Left(srv.call(req)),
DispatchItem::KeepAliveTimeout => Either::Right(Ready::Err(
MqttError::Protocol(ProtocolError::KeepAliveTimeout),
)),
DispatchItem::EncoderError(e) => {
Either::Right(Ready::Err(MqttError::Protocol(ProtocolError::Encode(e))))
}
DispatchItem::DecoderError(e) => {
Either::Right(Ready::Err(MqttError::Protocol(ProtocolError::Decode(e))))
}
DispatchItem::IoError(e) => {
Either::Right(Ready::Err(MqttError::Protocol(ProtocolError::Io(e))))
}
DispatchItem::WBackPressureEnabled | DispatchItem::WBackPressureDisabled => {
Either::Right(Ready::Ok(None))
}
}),
dispatcher,
Timer::new(Millis::ONE_SEC),
)
.keepalive_timeout(Seconds::ZERO)
Expand All @@ -260,7 +207,8 @@ where
pub async fn start<F, S>(self, service: F) -> Result<(), MqttError<Err>>
where
F: IntoService<S> + 'static,
S: Service<Request = ControlMessage, Response = ControlResult, Error = Err> + 'static,
S: Service<Request = ControlMessage<Err>, Response = ControlResult, Error = Err>
+ 'static,
{
if self.keepalive.non_zero() {
ntex::rt::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive));
Expand All @@ -270,31 +218,14 @@ where
MqttSink::new(self.shared.clone()),
self.max_receive,
dispatch(self.builder.finish(), self.handlers),
service.into_service().map_err(MqttError::Service),
service.into_service(),
);

Dispatcher::with(
self.io,
self.shared.state.clone(),
self.shared.clone(),
apply_fn(dispatcher, |req: DispatchItem<Rc<MqttShared>>, srv| match req {
DispatchItem::Item(req) => Either::Left(srv.call(req)),
DispatchItem::KeepAliveTimeout => Either::Right(Ready::Err(
MqttError::Protocol(ProtocolError::KeepAliveTimeout),
)),
DispatchItem::EncoderError(e) => {
Either::Right(Ready::Err(MqttError::Protocol(ProtocolError::Encode(e))))
}
DispatchItem::DecoderError(e) => {
Either::Right(Ready::Err(MqttError::Protocol(ProtocolError::Decode(e))))
}
DispatchItem::IoError(e) => {
Either::Right(Ready::Err(MqttError::Protocol(ProtocolError::Io(e))))
}
DispatchItem::WBackPressureEnabled | DispatchItem::WBackPressureDisabled => {
Either::Right(Ready::Ok(None))
}
}),
dispatcher,
Timer::new(Millis::ONE_SEC),
)
.keepalive_timeout(Seconds::ZERO)
Expand All @@ -306,7 +237,7 @@ where
fn dispatch<Err, PErr>(
router: Router<usize>,
handlers: Vec<Handler<PErr>>,
) -> impl Service<Request = Publish, Response = Either<(), Publish>, Error = MqttError<Err>>
) -> impl Service<Request = Publish, Response = Either<(), Publish>, Error = Err>
where
PErr: 'static,
Err: From<PErr>,
Expand All @@ -315,9 +246,9 @@ where
if let Some((idx, _info)) = router.recognize(req.topic_mut()) {
// exec handler
let fut = call(req, &handlers[*idx]);
Either::Left(async move { fut.await.map_err(MqttError::Service) })
Either::Left(async move { fut.await })
} else {
Either::Right(Ready::<_, MqttError<Err>>::Ok(Either::Right(req)))
Either::Right(Ready::<_, Err>::Ok(Either::Right(req)))
}
})
}
Expand Down
20 changes: 16 additions & 4 deletions src/v3/client/control.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
pub use crate::v3::control::{Closed, ControlResult, Disconnect};
use crate::v3::{codec, control::ControlResultKind};
pub use crate::v3::control::{Closed, ControlResult, Disconnect, Error, ProtocolError};
use crate::v3::{codec, control::ControlResultKind, error};

pub enum ControlMessage {
pub enum ControlMessage<E> {
/// Unhandled publish packet
Publish(Publish),
/// Disconnect packet
Disconnect(Disconnect),
/// Connection closed
Closed(Closed),
/// Application level error from resources and control services
Error(Error<E>),
/// Protocol level error
ProtocolError(ProtocolError),
}

impl ControlMessage {
impl<E> ControlMessage<E> {
pub(super) fn publish(pkt: codec::Publish) -> Self {
ControlMessage::Publish(Publish(pkt))
}
Expand All @@ -23,6 +27,14 @@ impl ControlMessage {
ControlMessage::Closed(Closed::new(is_error))
}

pub(super) fn error(err: E) -> Self {
ControlMessage::Error(Error::new(err))
}

pub(super) fn proto_error(err: error::ProtocolError) -> Self {
ControlMessage::ProtocolError(ProtocolError::new(err))
}

pub fn disconnect(&self) -> ControlResult {
ControlResult { result: ControlResultKind::Disconnect }
}
Expand Down
Loading

0 comments on commit 19fce07

Please sign in to comment.