Skip to content

Commit

Permalink
update ntex
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Sep 17, 2021
1 parent 19fce07 commit dfa097e
Show file tree
Hide file tree
Showing 14 changed files with 129 additions and 65 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.7.0] - 2021-09-17

* Update ntex to 0.4

## [0.7.0-b.10] - 2021-09-07

* v3: add ControlMessage::Error and ControlMessage::ProtocolError
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "0.7.0-b.10"
version = "0.7.0"
authors = ["ntex contributors <[email protected]>"]
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
Expand All @@ -12,7 +12,7 @@ exclude = [".gitignore", ".travis.yml", ".cargo/config"]
edition = "2018"

[dependencies]
ntex = { version = "0.4.0-b.12", default-features = false }
ntex = { version = "0.4.0", default-features = false }
bitflags = "1.3"
derive_more = "0.99"
log = "0.4"
Expand All @@ -28,4 +28,4 @@ tokio-rustls = "0.22"
openssl = "0.10"
tokio-openssl = "0.6"

ntex = { version = "0.4.0-b.7", features = ["rustls", "openssl"] }
ntex = { version = "0.4.0", features = ["rustls", "openssl"] }
2 changes: 1 addition & 1 deletion examples/openssl.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use ntex::pipeline_factory;
use ntex::rt::net::TcpStream;
use ntex::server::openssl::Acceptor;
use ntex::service::pipeline_factory;
use ntex_mqtt::{v3, v5, MqttError, MqttServer};
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
use tokio_openssl::SslStream;
Expand Down
2 changes: 1 addition & 1 deletion examples/rustls.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{fs::File, io::BufReader};

use ntex::pipeline_factory;
use ntex::rt::net::TcpStream;
use ntex::server::rustls::Acceptor;
use ntex::service::pipeline_factory;
use ntex_mqtt::{v3, v5, MqttError, MqttServer};
use rustls::internal::pemfile::{certs, rsa_private_keys};
use rustls::{NoClientAuth, ServerConfig};
Expand Down
2 changes: 1 addition & 1 deletion examples/session.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use futures::future::ok;
use ntex::{fn_factory_with_config, fn_service};
use ntex::service::{fn_factory_with_config, fn_service};
use ntex_mqtt::v5::codec::PublishAckReason;
use ntex_mqtt::{v3, v5, MqttServer};

Expand Down
2 changes: 1 addition & 1 deletion examples/subs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::cell::RefCell;

use ntex::service::{fn_factory_with_config, fn_service, ServiceFactory};
use ntex::util::{ByteString, Ready};
use ntex::{fn_factory_with_config, fn_service, ServiceFactory};
use ntex_mqtt::v5::{
self, ControlMessage, ControlResult, MqttServer, Publish, PublishAck, Session,
};
Expand Down
10 changes: 9 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use derive_more::{Display, From};
use ntex::util::Either;
use std::io;
use std::{error, io};

/// Errors which can occur when attempting to handle mqtt connection.
#[derive(Debug)]
Expand Down Expand Up @@ -49,6 +49,8 @@ pub enum ProtocolError {
Io(io::Error),
}

impl error::Error for ProtocolError {}

impl<E> From<ProtocolError> for MqttError<E> {
fn from(err: ProtocolError) -> Self {
MqttError::Protocol(err)
Expand Down Expand Up @@ -98,6 +100,8 @@ pub enum DecodeError {
Utf8Error(std::str::Utf8Error),
}

impl error::Error for DecodeError {}

#[derive(Copy, Clone, Debug, Display, PartialEq, Eq, Hash)]
pub enum EncodeError {
InvalidLength,
Expand All @@ -106,6 +110,8 @@ pub enum EncodeError {
UnsupportedVersion,
}

impl error::Error for EncodeError {}

impl PartialEq for DecodeError {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
Expand Down Expand Up @@ -138,3 +144,5 @@ pub enum SendPacketError {
#[display(fmt = "Peer disconnected")]
Disconnected,
}

impl error::Error for SendPacketError {}
8 changes: 4 additions & 4 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ mod tests {
server,
BytesCodec,
State::new(),
ntex::fn_service(|msg: DispatchItem<BytesCodec>| async move {
ntex::service::fn_service(|msg: DispatchItem<BytesCodec>| async move {
sleep(time::Duration::from_millis(50)).await;
if let DispatchItem::Item(msg) = msg {
Ok::<_, ()>(Some(msg.freeze()))
Expand Down Expand Up @@ -587,7 +587,7 @@ mod tests {
server,
BytesCodec,
State::new(),
ntex::fn_service(move |msg: DispatchItem<BytesCodec>| {
ntex::service::fn_service(move |msg: DispatchItem<BytesCodec>| {
let waiter = waiter.clone();
async move {
waiter.await;
Expand Down Expand Up @@ -628,7 +628,7 @@ mod tests {
server,
BytesCodec,
st.clone(),
ntex::fn_service(|msg: DispatchItem<BytesCodec>| async move {
ntex::service::fn_service(|msg: DispatchItem<BytesCodec>| async move {
if let DispatchItem::Item(msg) = msg {
Ok::<_, ()>(Some(msg.freeze()))
} else {
Expand Down Expand Up @@ -663,7 +663,7 @@ mod tests {
server,
BytesCodec,
state.clone(),
ntex::fn_service(|_: DispatchItem<BytesCodec>| async move {
ntex::service::fn_service(|_: DispatchItem<BytesCodec>| async move {
Err::<Option<Bytes>, _>(())
}),
);
Expand Down
31 changes: 26 additions & 5 deletions src/v3/client/connection.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{future::Future, marker::PhantomData, rc::Rc, time::Instant};
use std::{fmt, future::Future, marker::PhantomData, rc::Rc, time::Instant};

use ntex::codec::{AsyncRead, AsyncWrite};
use ntex::router::{IntoPattern, Router, RouterBuilder};
use ntex::service::{apply_fn, boxed::BoxService, into_service, IntoService, Service};
use ntex::service::{apply_fn, boxed, into_service, IntoService, Service};
use ntex::time::{sleep, Millis, Seconds};
use ntex::util::{Either, Ready};

Expand All @@ -24,6 +24,17 @@ pub struct Client<Io> {
max_receive: usize,
}

impl<Io> fmt::Debug for Client<Io> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("v3::Client")
.field("keepalive", &self.keepalive)
.field("disconnect_timeout", &self.disconnect_timeout)
.field("session_present", &self.session_present)
.field("max_receive", &self.max_receive)
.finish()
}
}

impl<T> Client<T>
where
T: AsyncRead + AsyncWrite + Unpin,
Expand Down Expand Up @@ -74,7 +85,7 @@ where
{
let mut builder = Router::build();
builder.path(address, 0);
let handlers = vec![ntex::boxed::service(service.into_service())];
let handlers = vec![boxed::service(service.into_service())];

ClientRouter {
builder,
Expand Down Expand Up @@ -146,7 +157,7 @@ where
}
}

type Handler<E> = BoxService<Publish, (), E>;
type Handler<E> = boxed::BoxService<Publish, (), E>;

/// Mqtt client with routing capabilities
pub struct ClientRouter<Io, Err, PErr> {
Expand All @@ -160,6 +171,16 @@ pub struct ClientRouter<Io, Err, PErr> {
_t: PhantomData<Err>,
}

impl<Io, Err, PErr> fmt::Debug for ClientRouter<Io, Err, PErr> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("v3::ClientRouter")
.field("keepalive", &self.keepalive)
.field("disconnect_timeout", &self.disconnect_timeout)
.field("max_receive", &self.max_receive)
.finish()
}
}

impl<Io, Err, PErr> ClientRouter<Io, Err, PErr>
where
Io: AsyncRead + AsyncWrite + Unpin + 'static,
Expand All @@ -174,7 +195,7 @@ where
S: Service<Request = Publish, Response = (), Error = PErr> + 'static,
{
self.builder.path(address, self.handlers.len());
self.handlers.push(ntex::boxed::service(service.into_service()));
self.handlers.push(boxed::service(service.into_service()));
self
}

Expand Down
19 changes: 11 additions & 8 deletions src/v3/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,18 +273,21 @@ where
C: ServiceFactory<Config = (), Request = Handshake<Io>, Response = HandshakeAck<Io, St>>,
C::Error: fmt::Debug,
{
ntex::apply(
ntex::service::apply(
Timeout::new(Millis::from(handshake_timeout)),
ntex::fn_factory(move || {
ntex::service::fn_factory(move || {
let pool = pool.clone();
let fut = factory.new_service(());
async move {
let service = fut.await?;
let pool = pool.clone();
let service = Rc::new(service.map_err(MqttError::Service));
Ok::<_, C::InitError>(ntex::apply_fn(service, move |conn: Io, service| {
handshake(conn, None, service.clone(), max_size, pool.clone())
}))
Ok::<_, C::InitError>(ntex::service::apply_fn(
service,
move |conn: Io, service| {
handshake(conn, None, service.clone(), max_size, pool.clone())
},
))
}
}),
)
Expand All @@ -311,16 +314,16 @@ where
C: ServiceFactory<Config = (), Request = Handshake<Io>, Response = HandshakeAck<Io, St>>,
C::Error: fmt::Debug,
{
ntex::apply(
ntex::service::apply(
Timeout::new(Millis::from(handshake_timeout)),
ntex::fn_factory(move || {
ntex::service::fn_factory(move || {
let pool = pool.clone();
let fut = factory.new_service(());
async move {
let service = fut.await?;
let pool = pool.clone();
let service = Rc::new(service.map_err(MqttError::Service));
Ok(ntex::apply_fn(service, move |(io, state), service| {
Ok(ntex::service::apply_fn(service, move |(io, state), service| {
handshake(io, Some(state), service.clone(), max_size, pool.clone())
}))
}
Expand Down
34 changes: 28 additions & 6 deletions src/v5/client/connection.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::time::Instant;
use std::{cell::RefCell, convert::TryFrom, future::Future, marker, num::NonZeroU16, rc::Rc};
use std::{
cell::RefCell, convert::TryFrom, fmt, future::Future, marker, num::NonZeroU16, rc::Rc,
};

use ntex::codec::{AsyncRead, AsyncWrite};
use ntex::router::{IntoPattern, Path, Router, RouterBuilder};
use ntex::service::boxed::BoxService;
use ntex::service::{into_service, IntoService, Service};
use ntex::service::{boxed, into_service, IntoService, Service};
use ntex::time::{sleep, Millis, Seconds};
use ntex::util::{ByteString, Either, HashMap, Ready};

Expand All @@ -26,6 +27,17 @@ pub struct Client<Io> {
pkt: Box<codec::ConnectAck>,
}

impl<Io> fmt::Debug for Client<Io> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("v5::Client")
.field("keepalive", &self.keepalive)
.field("disconnect_timeout", &self.disconnect_timeout)
.field("max_receive", &self.max_receive)
.field("connect", &self.pkt)
.finish()
}
}

impl<T> Client<T>
where
T: AsyncRead + AsyncWrite + Unpin,
Expand Down Expand Up @@ -89,7 +101,7 @@ where
{
let mut builder = Router::build();
builder.path(address, 0);
let handlers = vec![ntex::boxed::service(service.into_service())];
let handlers = vec![boxed::service(service.into_service())];

ClientRouter {
builder,
Expand Down Expand Up @@ -165,7 +177,7 @@ where
}
}

type Handler<E> = BoxService<Publish, PublishAck, E>;
type Handler<E> = boxed::BoxService<Publish, PublishAck, E>;

/// Mqtt client with routing capabilities
pub struct ClientRouter<Io, Err, PErr> {
Expand All @@ -179,6 +191,16 @@ pub struct ClientRouter<Io, Err, PErr> {
_t: marker::PhantomData<Err>,
}

impl<Io, Err, PErr> fmt::Debug for ClientRouter<Io, Err, PErr> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("v5::ClientRouter")
.field("keepalive", &self.keepalive)
.field("disconnect_timeout", &self.disconnect_timeout)
.field("max_receive", &self.max_receive)
.finish()
}
}

impl<Io, Err, PErr> ClientRouter<Io, Err, PErr>
where
Io: AsyncRead + AsyncWrite + Unpin + 'static,
Expand All @@ -194,7 +216,7 @@ where
S: Service<Request = Publish, Response = PublishAck, Error = PErr> + 'static,
{
self.builder.path(address, self.handlers.len());
self.handlers.push(ntex::boxed::service(service.into_service()));
self.handlers.push(boxed::service(service.into_service()));
self
}

Expand Down
Loading

0 comments on commit dfa097e

Please sign in to comment.