Skip to content

Commit

Permalink
upgrade to ntex 0.5 (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Dec 20, 2021
1 parent c0c571b commit 9c2b463
Show file tree
Hide file tree
Showing 37 changed files with 1,242 additions and 1,527 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.8.0-b.0] - 2021-12-21

* Upgrade to ntex 0.5

## [0.7.7] - 2021-12-17

* Wait for close control message and inner services on dispatcher shutdown #78
Expand Down
9 changes: 4 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "0.7.7"
version = "0.8.0-b.0"
authors = ["ntex contributors <[email protected]>"]
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
Expand All @@ -12,7 +12,7 @@ exclude = [".gitignore", ".travis.yml", ".cargo/config"]
edition = "2018"

[dependencies]
ntex = { version = "0.4.11", default-features = false }
ntex = { version = "0.5.0-b.1", default-features = false }
bitflags = "1.3"
derive_more = "0.99"
log = "0.4"
Expand All @@ -23,10 +23,9 @@ pin-project-lite = "0.2"
[dev-dependencies]
env_logger = "0.9"
futures = "0.3"
ntex-tls = "0.1.0-b.1"
rustls = "0.20"
rustls-pemfile = "0.2"
tokio-rustls = "0.23"
openssl = "0.10"
tokio-openssl = "0.6"

ntex = { version = "0.4", features = ["rustls", "openssl"] }
ntex = { version = "0.5.0-b.1", features = ["rustls", "openssl"] }
19 changes: 7 additions & 12 deletions examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ impl std::convert::TryFrom<ServerError> for v5::PublishAck {
}
}

async fn handshake_v3<Io>(
handshake: v3::Handshake<Io>,
) -> Result<v3::HandshakeAck<Io, Session>, ServerError> {
async fn handshake_v3(
handshake: v3::Handshake,
) -> Result<v3::HandshakeAck<Session>, ServerError> {
log::info!("new connection: {:?}", handshake);
Ok(handshake.ack(Session, false))
}
Expand All @@ -32,9 +32,9 @@ async fn publish_v3(publish: v3::Publish) -> Result<(), ServerError> {
Ok(())
}

async fn handshake_v5<Io>(
handshake: v5::Handshake<Io>,
) -> Result<v5::HandshakeAck<Io, Session>, ServerError> {
async fn handshake_v5(
handshake: v5::Handshake,
) -> Result<v5::HandshakeAck<Session>, ServerError> {
log::info!("new connection: {:?}", handshake);
Ok(handshake.ack(Session))
}
Expand All @@ -46,16 +46,11 @@ async fn publish_v5(publish: v5::Publish) -> Result<v5::PublishAck, ServerError>

#[ntex::main]
async fn main() -> std::io::Result<()> {
println!("{}", std::mem::size_of::<v5::codec::Publish>());
println!("{}", std::mem::size_of::<v5::codec::PublishProperties>());
println!("{}", std::mem::size_of::<v5::codec::Packet>());
println!("{}", std::mem::size_of::<v5::Handshake<ntex::rt::net::TcpStream>>());
println!("{}", std::mem::size_of::<v5::error::MqttError<()>>());
std::env::set_var("RUST_LOG", "ntex=trace,ntex_mqtt=trace,basic=trace");
env_logger::init();

ntex::server::Server::build()
.bind("mqtt", "127.0.0.1:1883", || {
.bind("mqtt", "127.0.0.1:1883", |_| {
MqttServer::new()
.v3(v3::MqttServer::new(handshake_v3).publish(publish_v3))
.v5(v5::MqttServer::new(handshake_v5).publish(publish_v5))
Expand Down
14 changes: 6 additions & 8 deletions examples/openssl.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use ntex::rt::net::TcpStream;
use ntex::server::openssl::Acceptor;
use ntex::service::pipeline_factory;
use ntex_mqtt::{v3, v5, MqttError, MqttServer};
use ntex_tls::openssl::Acceptor;
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
use tokio_openssl::SslStream;

#[derive(Clone)]
struct Session;
Expand All @@ -26,8 +24,8 @@ impl std::convert::TryFrom<ServerError> for v5::PublishAck {
}

async fn handshake_v3(
handshake: v3::Handshake<SslStream<TcpStream>>,
) -> Result<v3::HandshakeAck<SslStream<TcpStream>, Session>, ServerError> {
handshake: v3::Handshake,
) -> Result<v3::HandshakeAck<Session>, ServerError> {
log::info!("new connection: {:?}", handshake);
Ok(handshake.ack(Session, false))
}
Expand All @@ -38,8 +36,8 @@ async fn publish_v3(publish: v3::Publish) -> Result<(), ServerError> {
}

async fn handshake_v5(
handshake: v5::Handshake<SslStream<TcpStream>>,
) -> Result<v5::HandshakeAck<SslStream<TcpStream>, Session>, ServerError> {
handshake: v5::Handshake,
) -> Result<v5::HandshakeAck<Session>, ServerError> {
log::info!("new connection: {:?}", handshake);
Ok(handshake.ack(Session))
}
Expand All @@ -63,7 +61,7 @@ async fn main() -> std::io::Result<()> {
let acceptor = builder.build();

ntex::server::Server::build()
.bind("mqtt", "127.0.0.1:8883", move || {
.bind("mqtt", "127.0.0.1:8883", move |_| {
pipeline_factory(Acceptor::new(acceptor.clone()))
.map_err(|_err| MqttError::Service(ServerError {}))
.and_then(
Expand Down
20 changes: 9 additions & 11 deletions examples/rustls.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::{fs::File, io::BufReader};
use std::{fs::File, io::BufReader, sync::Arc};

use ntex::rt::net::TcpStream;
use ntex::server::rustls::Acceptor;
use ntex::service::pipeline_factory;
use ntex_mqtt::{v3, v5, MqttError, MqttServer};
use ntex_tls::rustls::Acceptor;
use rustls::{Certificate, PrivateKey, ServerConfig};
use rustls_pemfile::{certs, rsa_private_keys};
use tokio_rustls::server::TlsStream;

#[derive(Clone)]
struct Session;
Expand All @@ -29,8 +27,8 @@ impl std::convert::TryFrom<ServerError> for v5::PublishAck {
}

async fn handshake_v3(
handshake: v3::Handshake<TlsStream<TcpStream>>,
) -> Result<v3::HandshakeAck<TlsStream<TcpStream>, Session>, ServerError> {
handshake: v3::Handshake,
) -> Result<v3::HandshakeAck<Session>, ServerError> {
log::info!("new connection: {:?}", handshake);
Ok(handshake.ack(Session, false))
}
Expand All @@ -41,8 +39,8 @@ async fn publish_v3(publish: v3::Publish) -> Result<(), ServerError> {
}

async fn handshake_v5(
handshake: v5::Handshake<TlsStream<TcpStream>>,
) -> Result<v5::HandshakeAck<TlsStream<TcpStream>, Session>, ServerError> {
handshake: v5::Handshake,
) -> Result<v5::HandshakeAck<Session>, ServerError> {
log::info!("new connection: {:?}", handshake);
Ok(handshake.ack(Session))
}
Expand Down Expand Up @@ -72,11 +70,11 @@ async fn main() -> std::io::Result<()> {
.with_single_cert(cert_chain, keys)
.unwrap();

let tls_acceptor = Acceptor::new(tls_config);
let tls_acceptor = Arc::new(tls_config);

ntex::server::Server::build()
.bind("mqtt", "127.0.0.1:8883", move || {
pipeline_factory(tls_acceptor.clone())
.bind("mqtt", "127.0.0.1:8883", move |_| {
pipeline_factory(Acceptor::new(tls_acceptor.clone()))
.map_err(|_err| MqttError::Service(ServerError {}))
.and_then(
MqttServer::new()
Expand Down
20 changes: 10 additions & 10 deletions examples/session.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use futures::future::ok;
use ntex::service::{fn_factory_with_config, fn_service};
use ntex::util::Ready;
use ntex_mqtt::v5::codec::PublishAckReason;
use ntex_mqtt::{v3, v5, MqttServer};

Expand All @@ -26,9 +26,9 @@ impl std::convert::TryFrom<MyServerError> for v5::PublishAck {
}
}

async fn handshake_v3<Io>(
handshake: v3::Handshake<Io>,
) -> Result<v3::HandshakeAck<Io, MySession>, MyServerError> {
async fn handshake_v3(
handshake: v3::Handshake,
) -> Result<v3::HandshakeAck<MySession>, MyServerError> {
log::info!("new connection: {:?}", handshake);

let session = MySession { client_id: handshake.packet().client_id.to_string() };
Expand Down Expand Up @@ -56,9 +56,9 @@ async fn publish_v3(
}
}

async fn handshake_v5<Io>(
handshake: v5::Handshake<Io>,
) -> Result<v5::HandshakeAck<Io, MySession>, MyServerError> {
async fn handshake_v5(
handshake: v5::Handshake,
) -> Result<v5::HandshakeAck<MySession>, MyServerError> {
log::info!("new connection: {:?}", handshake);

let session = MySession { client_id: handshake.packet().client_id.to_string() };
Expand Down Expand Up @@ -93,18 +93,18 @@ async fn main() -> std::io::Result<()> {
log::info!("Hello");

ntex::server::Server::build()
.bind("mqtt", "127.0.0.1:1883", || {
.bind("mqtt", "127.0.0.1:1883", |_| {
MqttServer::new()
.v3(v3::MqttServer::new(handshake_v3).publish(fn_factory_with_config(
|session: v3::Session<MySession>| {
ok::<_, MyServerError>(fn_service(move |req| {
Ready::Ok::<_, MyServerError>(fn_service(move |req| {
publish_v3(session.clone(), req)
}))
},
)))
.v5(v5::MqttServer::new(handshake_v5).publish(fn_factory_with_config(
|session: v5::Session<MySession>| {
ok::<_, MyServerError>(fn_service(move |req| {
Ready::Ok::<_, MyServerError>(fn_service(move |req| {
publish_v5(session.clone(), req)
}))
},
Expand Down
9 changes: 5 additions & 4 deletions examples/subs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ impl std::convert::TryFrom<MyServerError> for PublishAck {
}
}

async fn handshake<Io>(
handshake: v5::Handshake<Io>,
) -> Result<v5::HandshakeAck<Io, MySession>, MyServerError> {
async fn handshake(
handshake: v5::Handshake,
) -> Result<v5::HandshakeAck<MySession>, MyServerError> {
log::info!("new connection: {:?}", handshake);

let session = MySession {
Expand Down Expand Up @@ -96,6 +96,7 @@ fn control_service_factory() -> impl ServiceFactory<
}
v5::ControlMessage::Unsubscribe(s) => Ready::Ok(s.ack()),
v5::ControlMessage::Closed(c) => Ready::Ok(c.ack()),
v5::ControlMessage::PeerGone(c) => Ready::Ok(c.ack()),
}))
})
}
Expand All @@ -106,7 +107,7 @@ async fn main() -> std::io::Result<()> {
env_logger::init();

ntex::server::Server::build()
.bind("mqtt", "127.0.0.1:1883", || {
.bind("mqtt", "127.0.0.1:1883", |_| {
MqttServer::new(handshake)
.control(control_service_factory())
.publish(fn_factory_with_config(|session: Session<MySession>| {
Expand Down
4 changes: 4 additions & 0 deletions examples/subs_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ async fn main() -> std::io::Result<()> {
log::error!("Protocol error: {:?}", msg);
Ready::Ok(msg.ack())
}
v5::client::ControlMessage::PeerGone(msg) => {
log::warn!("Peer closed connection: {:?}", msg.error());
Ready::Ok(msg.ack())
}
v5::client::ControlMessage::Closed(msg) => {
log::warn!("Server closed connection: {:?}", msg);
Ready::Ok(msg.ack())
Expand Down
Loading

0 comments on commit 9c2b463

Please sign in to comment.