From c53a2be03e3053fa40a59642eead16e63e817d9b Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 28 Aug 2021 21:41:13 +0600 Subject: [PATCH] use new ntex timer api --- CHANGES.md | 4 +++ Cargo.toml | 6 ++-- examples/client.rs | 10 +++--- src/io.rs | 61 +++++++++++++++------------------ src/server.rs | 68 +++++++++++++------------------------ src/service.rs | 38 ++++++++++----------- src/v3/client/connection.rs | 47 +++++++++++++------------ src/v3/client/connector.rs | 44 +++++++++++------------- src/v3/handshake.rs | 16 +++++---- src/v3/selector.rs | 26 ++++++-------- src/v3/server.rs | 44 ++++++++++++------------ src/v5/client/connection.rs | 47 +++++++++++++------------ src/v5/client/connector.rs | 42 +++++++++++------------ src/v5/selector.rs | 26 ++++++-------- src/v5/server.rs | 50 +++++++++++++-------------- tests/test_server.rs | 4 +-- tests/test_server_v5.rs | 2 +- 17 files changed, 247 insertions(+), 288 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 5048793a..f0e864a6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.7.0-b.8] - 2021-08-28 + +* use new ntex's timer api + ## [0.7.0-b.7] - 2021-08-16 * v3: Boxed Packet::Connect to trim down Packet size diff --git a/Cargo.toml b/Cargo.toml index 38007297..a1d7b884 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-mqtt" -version = "0.7.0-b.7" +version = "0.7.0-b.8" authors = ["ntex contributors "] description = "Client and Server framework for MQTT v5 and v3.1.1 protocols" documentation = "https://docs.rs/ntex-mqtt" @@ -12,7 +12,7 @@ exclude = [".gitignore", ".travis.yml", ".cargo/config"] edition = "2018" [dependencies] -ntex = { version = "0.4.0-b.2", default-features = false } +ntex = { version = "0.4.0-b.4", default-features = false } bitflags = "1.2" derive_more = "0.99" log = "0.4" @@ -28,4 +28,4 @@ tokio-rustls = "0.22" openssl = "0.10" tokio-openssl = "0.6" -ntex = { version = "0.4.0-b.2", features = ["rustls", "openssl"] } +ntex = { version = "0.4.0-b.4", features = ["rustls", "openssl"] } diff --git a/examples/client.rs b/examples/client.rs index 44aaaede..41b8110e 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,6 +1,4 @@ -use std::time::Duration; - -use ntex::rt::time::delay_for; +use ntex::time::{sleep, Seconds}; use ntex_mqtt::v5; #[derive(Debug)] @@ -32,7 +30,7 @@ async fn main() -> std::io::Result<()> { // connect to server let client = v5::client::MqttConnector::new("127.0.0.1:1883") .client_id("user") - .keep_alive(1) + .keep_alive(Seconds::ONE) .connect() .await .unwrap(); @@ -56,11 +54,11 @@ async fn main() -> std::io::Result<()> { .await .unwrap(); - delay_for(Duration::from_secs(10)).await; + sleep(10_000).await; log::info!("closing connection"); sink.close(); - delay_for(Duration::from_secs(1)).await; + sleep(1_000).await; Ok(()) } diff --git a/src/io.rs b/src/io.rs index 48abe388..14aedb69 100644 --- a/src/io.rs +++ b/src/io.rs @@ -6,7 +6,7 @@ pub(crate) use ntex::framed::{DispatchItem, ReadTask, State, Timer, Write, Write use ntex::codec::{AsyncRead, AsyncWrite, Decoder, Encoder}; use ntex::service::{IntoService, Service}; -use ntex::util::Either; +use ntex::{time::Seconds, util::Either}; type Response = ::Item; @@ -28,7 +28,7 @@ pin_project_lite::pin_project! { st: IoDispatcherState, timer: Timer, updated: time::Instant, - keepalive_timeout: u16, + keepalive_timeout: Seconds, #[pin] response: Option, response_idx: usize, @@ -119,11 +119,11 @@ where T: AsyncRead + AsyncWrite + Unpin + 'static, { let updated = timer.now(); - let keepalive_timeout: u16 = 30; + let keepalive_timeout = Seconds(30); let io = Rc::new(RefCell::new(io)); // register keepalive timer - let expire = updated + time::Duration::from_secs(keepalive_timeout as u64); + let expire = updated + time::Duration::from(keepalive_timeout); timer.register(expire, expire, &state); let inner = Rc::new(RefCell::new(DispatcherState { @@ -155,22 +155,21 @@ where /// To disable timeout set value to 0. /// /// By default keep-alive timeout is set to 30 seconds. - pub(crate) fn keepalive_timeout(mut self, timeout: u16) -> Self { + pub(crate) fn keepalive_timeout(mut self, timeout: Seconds) -> Self { // register keepalive timer - let prev = self.updated + time::Duration::from_secs(self.keepalive_timeout as u64); - if timeout == 0 { + let prev = self.updated + time::Duration::from(self.keepalive_timeout); + if timeout.is_zero() { self.timer.unregister(prev, &self.state); } else { - let expire = self.updated + time::Duration::from_secs(timeout as u64); + let expire = self.updated + time::Duration::from(timeout); self.timer.register(expire, prev, &self.state); } self.keepalive_timeout = timeout; - self } - /// Set connection disconnect timeout in milliseconds. + /// Set connection disconnect timeout. /// /// Defines a timeout for disconnect connection. If a disconnect procedure does not complete /// within this time, the connection get dropped. @@ -178,7 +177,7 @@ where /// To disable timeout set value to 0. /// /// By default disconnect timeout is set to 1 seconds. - pub(crate) fn disconnect_timeout(self, val: u16) -> Self { + pub(crate) fn disconnect_timeout(self, val: Seconds) -> Self { self.state.set_disconnect_timeout(val); self } @@ -287,12 +286,10 @@ where let mut inner = this.inner.borrow_mut(); // unregister keep-alive timer - if *this.keepalive_timeout != 0 { + if this.keepalive_timeout.non_zero() { this.timer.unregister( *this.updated - + time::Duration::from_secs( - *this.keepalive_timeout as u64, - ), + + time::Duration::from(*this.keepalive_timeout), this.state, ); } @@ -322,11 +319,11 @@ where match read.decode(this.codec) { Ok(Some(el)) => { // update keep-alive timer - if *this.keepalive_timeout != 0 { + if this.keepalive_timeout.non_zero() { let updated = this.timer.now(); if updated != *this.updated { - let ka = time::Duration::from_secs( - *this.keepalive_timeout as u64, + let ka = time::Duration::from( + *this.keepalive_timeout, ); this.timer.register( updated + ka, @@ -349,11 +346,11 @@ where *this.st = IoDispatcherState::Stop; // unregister keep-alive timer - if *this.keepalive_timeout != 0 { + if this.keepalive_timeout.non_zero() { this.timer.unregister( *this.updated - + time::Duration::from_secs( - *this.keepalive_timeout as u64, + + time::Duration::from( + *this.keepalive_timeout, ), this.state, ); @@ -440,12 +437,10 @@ where this.state.dispatcher_stopped(); // unregister keep-alive timer - if *this.keepalive_timeout != 0 { + if this.keepalive_timeout.non_zero() { this.timer.unregister( *this.updated - + time::Duration::from_secs( - *this.keepalive_timeout as u64, - ), + + time::Duration::from(*this.keepalive_timeout), this.state, ); } @@ -497,8 +492,8 @@ where mod tests { use ntex::channel::condition::Condition; use ntex::codec::BytesCodec; - use ntex::rt::time::sleep; use ntex::testing::Io; + use ntex::time::{sleep, Millis}; use ntex::util::Bytes; use super::*; @@ -521,8 +516,8 @@ mod tests { where T: AsyncRead + AsyncWrite + Unpin + 'static, { - let timer = Timer::with(time::Duration::from_secs(1)); - let keepalive_timeout = 30; + let timer = Timer::new(Millis::ONE_SEC); + let keepalive_timeout = Seconds(30); let updated = timer.now(); let io = Rc::new(RefCell::new(io)); ntex::rt::spawn(ReadTask::new(io.clone(), state.clone())); @@ -607,12 +602,12 @@ mod tests { ntex::rt::spawn(async move { let _ = disp.await; }); - sleep(time::Duration::from_millis(50)).await; + sleep(50).await; client.write("test"); - sleep(time::Duration::from_millis(50)).await; + sleep(50).await; client.write("test"); - sleep(time::Duration::from_millis(50)).await; + sleep(50).await; condition.notify(); let buf = client.read().await.unwrap(); @@ -642,7 +637,7 @@ mod tests { }), ); ntex::rt::spawn(async move { - let _ = disp.disconnect_timeout(25).await; + let _ = disp.disconnect_timeout(Seconds(1)).await; }); let buf = client.read().await.unwrap(); @@ -653,7 +648,7 @@ mod tests { assert_eq!(buf, Bytes::from_static(b"test")); st.close(); - sleep(time::Duration::from_millis(200)).await; + sleep(1200).await; assert!(client.is_server_dropped()); } diff --git a/src/server.rs b/src/server.rs index afe3e689..b086b8fc 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,8 +2,8 @@ use std::task::{Context, Poll}; use std::{convert::TryFrom, fmt, future::Future, io, marker, pin::Pin, rc::Rc, time}; use ntex::codec::{AsyncRead, AsyncWrite}; -use ntex::rt::time::{sleep, Sleep}; use ntex::service::{Service, ServiceFactory}; +use ntex::time::{sleep, Seconds, Sleep}; use ntex::util::{join, Ready}; use crate::error::{MqttError, ProtocolError}; @@ -15,7 +15,7 @@ use crate::{v3, v5}; pub struct MqttServer { v3: V3, v5: V5, - handshake_timeout: usize, + handshake_timeout: Seconds, _t: marker::PhantomData<(Io, Err, InitErr)>, } @@ -33,7 +33,7 @@ impl MqttServer { v3: DefaultProtocolServer::new(ProtocolVersion::MQTT3), v5: DefaultProtocolServer::new(ProtocolVersion::MQTT5), - handshake_timeout: 0, + handshake_timeout: Seconds::ZERO, _t: marker::PhantomData, } } @@ -54,11 +54,11 @@ impl Default } impl MqttServer { - /// Set handshake timeout in millis. + /// Set handshake timeout. /// /// Handshake includes `connect` packet. /// By default handshake timeuot is disabled. - pub fn handshake_timeout(mut self, timeout: usize) -> Self { + pub fn handshake_timeout(mut self, timeout: Seconds) -> Self { self.handshake_timeout = timeout; self } @@ -69,14 +69,14 @@ where Io: AsyncRead + AsyncWrite + Unpin + 'static, V3: ServiceFactory< Config = (), - Request = (Io, State, Option>>), + Request = (Io, State, Option), Response = (), Error = MqttError, InitError = InitErr, >, V5: ServiceFactory< Config = (), - Request = (Io, State, Option>>), + Request = (Io, State, Option), Response = (), Error = MqttError, InitError = InitErr, @@ -90,7 +90,7 @@ where Io, impl ServiceFactory< Config = (), - Request = (Io, State, Option>>), + Request = (Io, State, Option), Response = (), Error = MqttError, InitError = InitErr, @@ -137,7 +137,7 @@ where Io, impl ServiceFactory< Config = (), - Request = (Io, State, Option>>), + Request = (Io, State, Option), Response = (), Error = MqttError, InitError = InitErr, @@ -167,7 +167,7 @@ where V3, impl ServiceFactory< Config = (), - Request = (Io, State, Option>>), + Request = (Io, State, Option), Response = (), Error = MqttError, InitError = InitErr, @@ -219,7 +219,7 @@ where V3, impl ServiceFactory< Config = (), - Request = (Io, State, Option>>), + Request = (Io, State, Option), Response = (), Error = MqttError, InitError = InitErr, @@ -245,14 +245,14 @@ where Io: AsyncRead + AsyncWrite + Unpin + 'static, V3: ServiceFactory< Config = (), - Request = (Io, State, Option>>), + Request = (Io, State, Option), Response = (), Error = MqttError, InitError = InitErr, >, V5: ServiceFactory< Config = (), - Request = (Io, State, Option>>), + Request = (Io, State, Option), Response = (), Error = MqttError, InitError = InitErr, @@ -293,23 +293,15 @@ where /// Mqtt Server pub struct MqttServerImpl { handlers: Rc<(V3, V5)>, - handshake_timeout: usize, + handshake_timeout: Seconds, _t: marker::PhantomData<(Io, Err)>, } impl Service for MqttServerImpl where Io: AsyncRead + AsyncWrite + Unpin + 'static, - V3: Service< - Request = (Io, State, Option>>), - Response = (), - Error = MqttError, - >, - V5: Service< - Request = (Io, State, Option>>), - Response = (), - Error = MqttError, - >, + V3: Service), Response = (), Error = MqttError>, + V5: Service), Response = (), Error = MqttError>, { type Request = Io; type Response = (); @@ -339,11 +331,7 @@ where } fn call(&self, req: Io) -> Self::Future { - let delay = if self.handshake_timeout > 0 { - Some(Box::pin(sleep(time::Duration::from_secs(self.handshake_timeout as u64)))) - } else { - None - }; + let delay = self.handshake_timeout.map(sleep); MqttServerImplResponse { state: MqttServerImplState::Version { @@ -357,12 +345,12 @@ pin_project_lite::pin_project! { pub struct MqttServerImplResponse where V3: Service< - Request = (Io, State, Option>>), + Request = (Io, State, Option), Response = (), Error = MqttError, >, V5: Service< - Request = (Io, State, Option>>), + Request = (Io, State, Option), Response = (), Error = MqttError, >, @@ -377,23 +365,15 @@ pin_project_lite::pin_project! { pub(crate) enum MqttServerImplState { V3 { #[pin] fut: V3::Future }, V5 { #[pin] fut: V5::Future }, - Version { item: Option<(Io, State, VersionCodec, Rc<(V3, V5)>, Option>>)> }, + Version { item: Option<(Io, State, VersionCodec, Rc<(V3, V5)>, Option)> }, } } impl Future for MqttServerImplResponse where Io: AsyncRead + AsyncWrite + Unpin + 'static, - V3: Service< - Request = (Io, State, Option>>), - Response = (), - Error = MqttError, - >, - V5: Service< - Request = (Io, State, Option>>), - Response = (), - Error = MqttError, - >, + V3: Service), Response = (), Error = MqttError>, + V5: Service), Response = (), Error = MqttError>, { type Output = Result<(), MqttError>; @@ -459,7 +439,7 @@ impl DefaultProtocolServer { impl ServiceFactory for DefaultProtocolServer { type Config = (); - type Request = (Io, State, Option>>); + type Request = (Io, State, Option); type Response = (); type Error = MqttError; type Service = DefaultProtocolServer; @@ -472,7 +452,7 @@ impl ServiceFactory for DefaultProtocolServer Service for DefaultProtocolServer { - type Request = (Io, State, Option>>); + type Request = (Io, State, Option); type Response = (); type Error = MqttError; type Future = Ready; diff --git a/src/service.rs b/src/service.rs index 713636b4..ca92bf3e 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,9 +1,9 @@ use std::task::{Context, Poll}; -use std::{fmt, future::Future, marker::PhantomData, pin::Pin, rc::Rc, time::Duration}; +use std::{fmt, future::Future, marker::PhantomData, pin::Pin, rc::Rc}; use ntex::codec::{AsyncRead, AsyncWrite, Decoder, Encoder}; -use ntex::rt::time::Sleep; use ntex::service::{IntoServiceFactory, Service, ServiceFactory}; +use ntex::time::{Millis, Seconds, Sleep}; use ntex::util::{select, Either}; use super::io::{DispatchItem, Dispatcher, State, Timer}; @@ -13,18 +13,18 @@ type ResponseItem = Option<::Item>; pub(crate) struct FramedService { connect: C, handler: Rc, - disconnect_timeout: u16, + disconnect_timeout: Seconds, time: Timer, _t: PhantomData<(St, Io, Codec)>, } impl FramedService { - pub(crate) fn new(connect: C, service: T, disconnect_timeout: u16) -> Self { + pub(crate) fn new(connect: C, service: T, disconnect_timeout: Seconds) -> Self { FramedService { connect, disconnect_timeout, handler: Rc::new(service), - time: Timer::with(Duration::from_secs(1)), + time: Timer::new(Millis::ONE_SEC), _t: PhantomData, } } @@ -33,7 +33,7 @@ impl FramedService { impl ServiceFactory for FramedService where Io: AsyncRead + AsyncWrite + Unpin + 'static, - C: ServiceFactory, + C: ServiceFactory, C::Error: fmt::Debug, C::Future: 'static, ::Future: 'static, @@ -79,7 +79,7 @@ where pub(crate) struct FramedServiceImpl { connect: C, handler: Rc, - disconnect_timeout: u16, + disconnect_timeout: Seconds, time: Timer, _t: PhantomData<(St, Io, Codec)>, } @@ -87,7 +87,7 @@ pub(crate) struct FramedServiceImpl { impl Service for FramedServiceImpl where Io: AsyncRead + AsyncWrite + Unpin + 'static, - C: Service, + C: Service, C::Error: fmt::Debug, C::Future: 'static, T: ServiceFactory< @@ -137,7 +137,7 @@ where log::trace!("Connection handler is created, starting dispatcher"); Dispatcher::with(io, st, codec, handler, time) - .keepalive_timeout(keepalive as u16) + .keepalive_timeout(keepalive) .disconnect_timeout(timeout) .await }) @@ -147,18 +147,18 @@ where pub(crate) struct FramedService2 { connect: C, handler: Rc, - disconnect_timeout: u16, + disconnect_timeout: Seconds, time: Timer, _t: PhantomData<(St, Io, Codec)>, } impl FramedService2 { - pub(crate) fn new(connect: C, service: T, disconnect_timeout: u16) -> Self { + pub(crate) fn new(connect: C, service: T, disconnect_timeout: Seconds) -> Self { FramedService2 { connect, disconnect_timeout, handler: Rc::new(service), - time: Timer::with(Duration::from_secs(1)), + time: Timer::new(Millis::ONE_SEC), _t: PhantomData, } } @@ -170,7 +170,7 @@ where C: ServiceFactory< Config = (), Request = (Io, State), - Response = (Io, State, Codec, St, u16), + Response = (Io, State, Codec, St, Seconds), >, C::Error: fmt::Debug, C::Future: 'static, @@ -188,7 +188,7 @@ where ::Item: 'static, { type Config = (); - type Request = (Io, State, Option>>); + type Request = (Io, State, Option); type Response = (); type Error = C::Error; type InitError = C::InitError; @@ -217,7 +217,7 @@ where pub(crate) struct FramedServiceImpl2 { connect: C, handler: Rc, - disconnect_timeout: u16, + disconnect_timeout: Seconds, time: Timer, _t: PhantomData<(St, Io, Codec)>, } @@ -225,7 +225,7 @@ pub(crate) struct FramedServiceImpl2 { impl Service for FramedServiceImpl2 where Io: AsyncRead + AsyncWrite + Unpin + 'static, - C: Service, + C: Service, C::Error: fmt::Debug, C::Future: 'static, T: ServiceFactory< @@ -240,7 +240,7 @@ where Codec: Decoder + Encoder + Clone + 'static, ::Item: 'static, { - type Request = (Io, State, Option>>); + type Request = (Io, State, Option); type Response = (); type Error = C::Error; type Future = Pin>>>; @@ -256,7 +256,7 @@ where } #[inline] - fn call(&self, (req, state, delay): (Io, State, Option>>)) -> Self::Future { + fn call(&self, (req, state, delay): (Io, State, Option)) -> Self::Future { log::trace!("Start connection handshake"); let handler = self.handler.clone(); @@ -303,7 +303,7 @@ where }; Dispatcher::with(io, state, codec, handler, time) - .keepalive_timeout(ka as u16) + .keepalive_timeout(ka) .disconnect_timeout(timeout) .await }) diff --git a/src/v3/client/connection.rs b/src/v3/client/connection.rs index e50bcfbc..da4d7c14 100644 --- a/src/v3/client/connection.rs +++ b/src/v3/client/connection.rs @@ -1,9 +1,9 @@ -use std::{future::Future, marker::PhantomData, rc::Rc, time::Duration, time::Instant}; +use std::{future::Future, marker::PhantomData, rc::Rc, time::Instant}; use ntex::codec::{AsyncRead, AsyncWrite}; use ntex::router::{IntoPattern, Router, RouterBuilder}; -use ntex::rt::time::{delay_until, Instant as RtInstant}; use ntex::service::{apply_fn, boxed::BoxService, into_service, IntoService, Service}; +use ntex::time::{sleep, Millis, Seconds}; use ntex::util::{Either, Ready}; use crate::error::{MqttError, ProtocolError}; @@ -18,8 +18,8 @@ use super::dispatcher::create_dispatcher; pub struct Client { io: Io, shared: Rc, - keepalive: u16, - disconnect_timeout: u16, + keepalive: Seconds, + disconnect_timeout: Seconds, session_present: bool, max_receive: usize, } @@ -33,8 +33,8 @@ where io: T, shared: Rc, session_present: bool, - keepalive_timeout: u16, - disconnect_timeout: u16, + keepalive_timeout: Seconds, + disconnect_timeout: Seconds, max_receive: usize, ) -> Self { Client { @@ -92,7 +92,7 @@ where /// /// Default handler closes connection on any control message. pub async fn start_default(self) { - if self.keepalive > 0 { + if self.keepalive.non_zero() { ntex::rt::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); } @@ -125,9 +125,9 @@ where Either::Right(Ready::Ok(None)) } }), - Timer::with(Duration::from_secs(1)), + Timer::new(Millis::ONE_SEC), ) - .keepalive_timeout(0) + .keepalive_timeout(Seconds::ZERO) .disconnect_timeout(self.disconnect_timeout) .await; } @@ -139,7 +139,7 @@ where F: IntoService + 'static, S: Service + 'static, { - if self.keepalive > 0 { + if self.keepalive.non_zero() { ntex::rt::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); } @@ -172,9 +172,9 @@ where Either::Right(Ready::Ok(None)) } }), - Timer::with(Duration::from_secs(1)), + Timer::new(Millis::ONE_SEC), ) - .keepalive_timeout(0) + .keepalive_timeout(Seconds::ZERO) .disconnect_timeout(self.disconnect_timeout) .await } @@ -188,8 +188,8 @@ pub struct ClientRouter { handlers: Vec>, io: Io, shared: Rc, - keepalive: u16, - disconnect_timeout: u16, + keepalive: Seconds, + disconnect_timeout: Seconds, max_receive: usize, _t: PhantomData, } @@ -214,7 +214,7 @@ where /// Run client with default control messages handler pub async fn start_default(self) { - if self.keepalive > 0 { + if self.keepalive.non_zero() { ntex::rt::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); } @@ -249,9 +249,9 @@ where Either::Right(Ready::Ok(None)) } }), - Timer::with(Duration::from_secs(1)), + Timer::new(Millis::ONE_SEC), ) - .keepalive_timeout(0) + .keepalive_timeout(Seconds::ZERO) .disconnect_timeout(self.disconnect_timeout) .await; } @@ -262,7 +262,7 @@ where F: IntoService + 'static, S: Service + 'static, { - if self.keepalive > 0 { + if self.keepalive.non_zero() { ntex::rt::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); } @@ -295,9 +295,9 @@ where Either::Right(Ready::Ok(None)) } }), - Timer::with(Duration::from_secs(1)), + Timer::new(Millis::ONE_SEC), ) - .keepalive_timeout(0) + .keepalive_timeout(Seconds::ZERO) .disconnect_timeout(self.disconnect_timeout) .await } @@ -340,13 +340,12 @@ where } } -async fn keepalive(sink: MqttSink, timeout: u16) { +async fn keepalive(sink: MqttSink, timeout: Seconds) { log::debug!("start mqtt client keep-alive task"); - let keepalive = Duration::from_secs(timeout as u64); + let keepalive = Millis::from(timeout); loop { - let expire = RtInstant::from_std(Instant::now() + keepalive); - delay_until(expire).await; + sleep(keepalive).await; if !sink.ping() { // connection is closed diff --git a/src/v3/client/connector.rs b/src/v3/client/connector.rs index ff9e478e..ca14a7bd 100644 --- a/src/v3/client/connector.rs +++ b/src/v3/client/connector.rs @@ -1,9 +1,9 @@ -use std::{future::Future, rc::Rc, time::Duration}; +use std::{future::Future, rc::Rc}; use ntex::codec::{AsyncRead, AsyncWrite}; use ntex::connect::{self, Address, Connect, Connector}; -use ntex::rt::time::delay_for; use ntex::service::Service; +use ntex::time::{timeout, Millis, Seconds}; use ntex::util::{select, ByteString, Bytes, Either}; #[cfg(feature = "openssl")] @@ -24,8 +24,8 @@ pub struct MqttConnector { max_send: usize, max_receive: usize, max_packet_size: u32, - handshake_timeout: u16, - disconnect_timeout: u16, + handshake_timeout: Seconds, + disconnect_timeout: Seconds, pool: Rc, } @@ -43,8 +43,8 @@ where max_send: 16, max_receive: 16, max_packet_size: 64 * 1024, - handshake_timeout: 0, - disconnect_timeout: 3000, + handshake_timeout: Seconds::ZERO, + disconnect_timeout: Seconds(3), pool: Rc::new(MqttSinkPool::default()), } } @@ -77,8 +77,8 @@ where /// A time interval measured in seconds. /// /// keep-alive is set to 30 seconds by default. - pub fn keep_alive(mut self, val: u16) -> Self { - self.pkt.keep_alive = val; + pub fn keep_alive(mut self, val: Seconds) -> Self { + self.pkt.keep_alive = val.seconds() as u16; self } @@ -147,16 +147,16 @@ where self } - /// Set handshake timeout in milliseconds. + /// Set handshake timeout. /// /// Handshake includes `connect` packet and response `connect-ack`. /// By default handshake timeuot is disabled. - pub fn handshake_timeout(mut self, timeout: u16) -> Self { - self.handshake_timeout = timeout as u16; + pub fn handshake_timeout(mut self, timeout: Seconds) -> Self { + self.handshake_timeout = timeout; self } - /// Set client connection disconnect timeout in milliseconds. + /// Set client connection disconnect timeout. /// /// Defines a timeout for disconnect connection. If a disconnect procedure does not complete /// within this time, the connection get dropped. @@ -164,8 +164,8 @@ where /// To disable timeout set value to 0. /// /// By default disconnect timeout is set to 3 seconds. - pub fn disconnect_timeout(mut self, timeout: u16) -> Self { - self.disconnect_timeout = timeout as u16; + pub fn disconnect_timeout(mut self, timeout: Seconds) -> Self { + self.disconnect_timeout = timeout; self } @@ -224,16 +224,12 @@ where /// Connect to mqtt server pub fn connect(&self) -> impl Future, ClientError>> { - if self.handshake_timeout > 0 { - let fut = select( - delay_for(Duration::from_millis(self.handshake_timeout as u64)), - self._connect(), - ); + if self.handshake_timeout.non_zero() { + let fut = timeout(self.handshake_timeout, self._connect()); Either::Left(async move { - let result = fut.await; - match result { - Either::Left(_) => Err(ClientError::HandshakeTimeout), - Either::Right(res) => res.map_err(From::from), + match fut.await { + Ok(res) => res.map_err(From::from), + Err(_) => Err(ClientError::HandshakeTimeout), } }) } else { @@ -278,7 +274,7 @@ where io, shared, session_present, - keepalive_timeout, + Seconds(keepalive_timeout), disconnect_timeout, max_receive, )) diff --git a/src/v3/handshake.rs b/src/v3/handshake.rs index c8b6a798..3b9813b6 100644 --- a/src/v3/handshake.rs +++ b/src/v3/handshake.rs @@ -1,5 +1,7 @@ use std::{fmt, rc::Rc}; +use ntex::time::Seconds; + use super::codec as mqtt; use super::shared::MqttShared; use super::sink::MqttSink; @@ -44,7 +46,7 @@ impl Handshake { lw: 256, read_hw: 4 * 1024, write_hw: 4 * 1024, - keepalive: 30, + keepalive: Seconds(30), return_code: mqtt::ConnectAckReason::ConnectionAccepted, } } @@ -59,7 +61,7 @@ impl Handshake { lw: 256, read_hw: 4 * 1024, write_hw: 4 * 1024, - keepalive: 30, + keepalive: Seconds(30), return_code: mqtt::ConnectAckReason::IdentifierRejected, } } @@ -74,7 +76,7 @@ impl Handshake { lw: 256, read_hw: 4 * 1024, write_hw: 4 * 1024, - keepalive: 30, + keepalive: Seconds(30), return_code: mqtt::ConnectAckReason::BadUserNameOrPassword, } } @@ -89,7 +91,7 @@ impl Handshake { lw: 256, read_hw: 4 * 1024, write_hw: 4 * 1024, - keepalive: 30, + keepalive: Seconds(30), return_code: mqtt::ConnectAckReason::NotAuthorized, } } @@ -104,7 +106,7 @@ impl Handshake { lw: 256, read_hw: 4 * 1024, write_hw: 4 * 1024, - keepalive: 30, + keepalive: Seconds(30), return_code: mqtt::ConnectAckReason::ServiceUnavailable, } } @@ -123,7 +125,7 @@ pub struct HandshakeAck { pub(crate) session_present: bool, pub(crate) return_code: mqtt::ConnectAckReason, pub(crate) shared: Rc, - pub(crate) keepalive: u16, + pub(crate) keepalive: Seconds, pub(crate) lw: u16, pub(crate) read_hw: u16, pub(crate) write_hw: u16, @@ -133,7 +135,7 @@ impl HandshakeAck { /// Set idle time-out for the connection in seconds /// /// By default idle time-out is set to 30 seconds. - pub fn idle_timeout(mut self, timeout: u16) -> Self { + pub fn idle_timeout(mut self, timeout: Seconds) -> Self { self.keepalive = timeout; self } diff --git a/src/v3/selector.rs b/src/v3/selector.rs index 3a0c8d98..502aed17 100644 --- a/src/v3/selector.rs +++ b/src/v3/selector.rs @@ -1,8 +1,8 @@ use std::{fmt, future::Future, marker, pin::Pin, rc::Rc, task::Context, task::Poll, time}; use ntex::codec::{AsyncRead, AsyncWrite}; -use ntex::rt::time::{sleep, Sleep}; use ntex::service::{apply_fn_factory, boxed, IntoServiceFactory, Service, ServiceFactory}; +use ntex::time::{sleep, Seconds, Sleep}; use ntex::util::{timeout::Timeout, timeout::TimeoutError, Either, Ready}; use crate::error::{MqttError, ProtocolError}; @@ -14,7 +14,7 @@ use super::handshake::{Handshake, HandshakeAck}; use super::shared::{MqttShared, MqttSinkPool}; use super::{codec as mqtt, dispatcher::factory, MqttServer, MqttSink, Publish, Session}; -pub(crate) type SelectItem = (Handshake, State, Option>>); +pub(crate) type SelectItem = (Handshake, State, Option); type ServerFactory = boxed::BoxServiceFactory< (), @@ -34,7 +34,7 @@ type Server = pub struct Selector { servers: Vec>, max_size: u32, - handshake_timeout: u16, + handshake_timeout: Seconds, pool: Rc, _t: marker::PhantomData<(Io, Err, InitErr)>, } @@ -45,7 +45,7 @@ impl Selector { Selector { servers: Vec::new(), max_size: 0, - handshake_timeout: 0, + handshake_timeout: Seconds::ZERO, pool: Default::default(), _t: marker::PhantomData, } @@ -58,11 +58,11 @@ where Err: 'static, InitErr: 'static, { - /// Set handshake timeout in millis. + /// Set handshake timeout. /// /// Handshake includes `connect` packet and response `connect-ack`. /// By default handshake timeuot is disabled. - pub fn handshake_timeout(mut self, timeout: u16) -> Self { + pub fn handshake_timeout(mut self, timeout: Seconds) -> Self { self.handshake_timeout = timeout; self } @@ -115,7 +115,7 @@ where self, ) -> impl ServiceFactory< Config = (), - Request = (Io, State, Option>>), + Request = (Io, State, Option), Response = (), Error = MqttError, InitError = InitErr, @@ -162,7 +162,7 @@ where pub struct SelectorService { servers: Rc>>, max_size: u32, - handshake_timeout: u16, + handshake_timeout: Seconds, pool: Rc, } @@ -212,11 +212,7 @@ where 16, self.pool.clone(), )); - let delay = if self.handshake_timeout > 0 { - Some(Box::pin(sleep(time::Duration::from_secs(self.handshake_timeout as u64)))) - } else { - None - }; + let delay = self.handshake_timeout.map(sleep); Box::pin(async move { // read first packet @@ -275,7 +271,7 @@ where InitErr: 'static, { type Config = (); - type Request = (Io, State, Option>>); + type Request = (Io, State, Option); type Response = (); type Error = MqttError; type InitError = InitErr; @@ -308,7 +304,7 @@ where Io: AsyncRead + AsyncWrite + Unpin + 'static, Err: 'static, { - type Request = (Io, State, Option>>); + type Request = (Io, State, Option); type Response = (); type Error = MqttError; type Future = Pin>>>>; diff --git a/src/v3/server.rs b/src/v3/server.rs index 2d850bf0..7be6b009 100644 --- a/src/v3/server.rs +++ b/src/v3/server.rs @@ -1,9 +1,9 @@ use std::task::{Context, Poll}; -use std::{fmt, future::Future, marker::PhantomData, pin::Pin, rc::Rc, time::Duration}; +use std::{fmt, future::Future, marker::PhantomData, pin::Pin, rc::Rc}; use ntex::codec::{AsyncRead, AsyncWrite, Decoder, Encoder}; -use ntex::rt::time::Sleep; use ntex::service::{apply_fn_factory, IntoServiceFactory, Service, ServiceFactory}; +use ntex::time::{Millis, Seconds, Sleep}; use ntex::util::{timeout::Timeout, timeout::TimeoutError, Either, Ready}; use crate::error::{MqttError, ProtocolError}; @@ -24,8 +24,8 @@ pub struct MqttServer, _t: PhantomData<(Io, St)>, } @@ -55,8 +55,8 @@ where publish: DefaultPublishService::default(), max_size: 0, inflight: 16, - handshake_timeout: 0, - disconnect_timeout: 3000, + handshake_timeout: Seconds::ZERO, + disconnect_timeout: Seconds(3), pool: Default::default(), _t: PhantomData, } @@ -79,16 +79,16 @@ where + From + fmt::Debug, { - /// Set handshake timeout in millis. + /// Set handshake timeout. /// /// Handshake includes `connect` packet and response `connect-ack`. /// By default handshake timeuot is disabled. - pub fn handshake_timeout(mut self, timeout: u16) -> Self { + pub fn handshake_timeout(mut self, timeout: Seconds) -> Self { self.handshake_timeout = timeout; self } - /// Set server connection disconnect timeout in milliseconds. + /// Set server connection disconnect timeout. /// /// Defines a timeout for disconnect connection. If a disconnect procedure does not complete /// within this time, the connection get dropped. @@ -96,7 +96,7 @@ where /// To disable timeout set value to 0. /// /// By default disconnect timeout is set to 3 seconds. - pub fn disconnect_timeout(mut self, val: u16) -> Self { + pub fn disconnect_timeout(mut self, val: Seconds) -> Self { self.disconnect_timeout = val; self } @@ -217,7 +217,7 @@ where self, ) -> impl ServiceFactory< Config = (), - Request = (Io, State, Option>>), + Request = (Io, State, Option), Response = (), Error = MqttError, InitError = C::InitError, @@ -316,7 +316,7 @@ where handler: Rc::new(handler), max_size: self.max_size, disconnect_timeout: self.disconnect_timeout, - time: Timer::with(Duration::from_secs(1)), + time: Timer::new(Millis::ONE_SEC), _t: PhantomData, } } @@ -325,12 +325,12 @@ where fn handshake_service_factory( factory: C, max_size: u32, - handshake_timeout: u16, + handshake_timeout: Seconds, pool: Rc, ) -> impl ServiceFactory< Config = (), Request = Io, - Response = (Io, State, Rc, Session, u16), + Response = (Io, State, Rc, Session, Seconds), Error = MqttError, > where @@ -339,7 +339,7 @@ where C::Error: fmt::Debug, { ntex::apply( - Timeout::new(Duration::from_millis(handshake_timeout as u64)), + Timeout::new(Millis::from(handshake_timeout)), ntex::fn_factory(move || { let pool = pool.clone(); let fut = factory.new_service(()); @@ -362,12 +362,12 @@ where fn handshake_service_factory2( factory: C, max_size: u32, - handshake_timeout: u16, + handshake_timeout: Seconds, pool: Rc, ) -> impl ServiceFactory< Config = (), Request = (Io, State), - Response = (Io, State, Rc, Session, u16), + Response = (Io, State, Rc, Session, Seconds), Error = MqttError, InitError = C::InitError, > @@ -377,7 +377,7 @@ where C::Error: fmt::Debug, { ntex::apply( - Timeout::new(Duration::from_millis(handshake_timeout as u64)), + Timeout::new(Millis::from(handshake_timeout)), ntex::fn_factory(move || { let pool = pool.clone(); let fut = factory.new_service(()); @@ -403,7 +403,7 @@ async fn handshake( service: S, max_size: u32, pool: Rc, -) -> Result<(Io, State, Rc, Session, u16), S::Error> +) -> Result<(Io, State, Rc, Session, Seconds), S::Error> where Io: AsyncRead + AsyncWrite + Unpin, S: Service, Response = HandshakeAck, Error = MqttError>, @@ -484,7 +484,7 @@ where pub(crate) struct ServerSelector { connect: C, handler: Rc, - disconnect_timeout: u16, + disconnect_timeout: Seconds, time: Timer, check: Rc, max_size: u32, @@ -542,7 +542,7 @@ pub(crate) struct ServerSelectorImpl { check: Rc, connect: Rc, handler: Rc, - disconnect_timeout: u16, + disconnect_timeout: Seconds, time: Timer, max_size: u32, _t: PhantomData<(St, Io, R)>, @@ -651,7 +651,7 @@ where handler, time, ) - .keepalive_timeout(ack.keepalive as u16) + .keepalive_timeout(ack.keepalive) .disconnect_timeout(timeout) .await?; Ok(Either::Right(())) diff --git a/src/v5/client/connection.rs b/src/v5/client/connection.rs index a7a8cf78..6d342d53 100644 --- a/src/v5/client/connection.rs +++ b/src/v5/client/connection.rs @@ -1,11 +1,11 @@ -use std::time::{Duration, Instant}; +use std::time::Instant; use std::{cell::RefCell, convert::TryFrom, future::Future, marker, num::NonZeroU16, rc::Rc}; use ntex::codec::{AsyncRead, AsyncWrite}; use ntex::router::{IntoPattern, Path, Router, RouterBuilder}; -use ntex::rt::time::{delay_until, Instant as RtInstant}; use ntex::service::boxed::BoxService; use ntex::service::{into_service, IntoService, Service}; +use ntex::time::{sleep, Millis, Seconds}; use ntex::util::{ByteString, Either, HashMap, Ready}; use crate::error::MqttError; @@ -20,8 +20,8 @@ use super::dispatcher::create_dispatcher; pub struct Client { io: Io, shared: Rc, - keepalive: u16, - disconnect_timeout: u16, + keepalive: Seconds, + disconnect_timeout: Seconds, max_receive: usize, pkt: Box, } @@ -36,8 +36,8 @@ where shared: Rc, pkt: Box, max_receive: u16, - keepalive: u16, - disconnect_timeout: u16, + keepalive: Seconds, + disconnect_timeout: Seconds, ) -> Self { Client { io, @@ -107,7 +107,7 @@ where /// /// Default handler closes connection on any control message. pub async fn start_default(self) { - if self.keepalive > 0 { + if self.keepalive.non_zero() { ntex::rt::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); } @@ -126,9 +126,9 @@ where self.shared.state.clone(), self.shared, dispatcher, - Timer::with(Duration::from_secs(1)), + Timer::new(Millis::ONE_SEC), ) - .keepalive_timeout(0) + .keepalive_timeout(Seconds::ZERO) .disconnect_timeout(self.disconnect_timeout) .await; } @@ -140,7 +140,7 @@ where F: IntoService + 'static, S: Service, Response = ControlResult, Error = E> + 'static, { - if self.keepalive > 0 { + if self.keepalive.non_zero() { ntex::rt::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); } @@ -157,9 +157,9 @@ where self.shared.state.clone(), self.shared, dispatcher, - Timer::with(Duration::from_secs(1)), + Timer::new(Millis::ONE_SEC), ) - .keepalive_timeout(0) + .keepalive_timeout(Seconds::ZERO) .disconnect_timeout(self.disconnect_timeout) .await } @@ -173,8 +173,8 @@ pub struct ClientRouter { handlers: Vec>, io: Io, shared: Rc, - keepalive: u16, - disconnect_timeout: u16, + keepalive: Seconds, + disconnect_timeout: Seconds, max_receive: usize, _t: marker::PhantomData, } @@ -200,7 +200,7 @@ where /// Run client with default control messages handler pub async fn start_default(self) { - if self.keepalive > 0 { + if self.keepalive.non_zero() { ntex::rt::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); } @@ -219,9 +219,9 @@ where self.shared.state.clone(), self.shared, dispatcher, - Timer::with(Duration::from_secs(1)), + Timer::new(Millis::ONE_SEC), ) - .keepalive_timeout(0) + .keepalive_timeout(Seconds::ZERO) .disconnect_timeout(self.disconnect_timeout) .await; } @@ -233,7 +233,7 @@ where S: Service, Response = ControlResult, Error = Err> + 'static, { - if self.keepalive > 0 { + if self.keepalive.non_zero() { ntex::rt::spawn(keepalive(MqttSink::new(self.shared.clone()), self.keepalive)); } @@ -250,9 +250,9 @@ where self.shared.state.clone(), self.shared, dispatcher, - Timer::with(Duration::from_secs(1)), + Timer::new(Millis::ONE_SEC), ) - .keepalive_timeout(0) + .keepalive_timeout(Seconds::ZERO) .disconnect_timeout(self.disconnect_timeout) .await } @@ -317,13 +317,12 @@ where } } -async fn keepalive(sink: MqttSink, timeout: u16) { +async fn keepalive(sink: MqttSink, timeout: Seconds) { log::debug!("start mqtt client keep-alive task"); - let keepalive = Duration::from_secs(timeout as u64); + let keepalive = Millis::from(timeout); loop { - let expire = RtInstant::from_std(Instant::now() + keepalive); - delay_until(expire).await; + sleep(keepalive).await; if !sink.ping() { // connection is closed diff --git a/src/v5/client/connector.rs b/src/v5/client/connector.rs index 8832ca16..61290666 100644 --- a/src/v5/client/connector.rs +++ b/src/v5/client/connector.rs @@ -2,8 +2,8 @@ use std::{future::Future, num::NonZeroU16, num::NonZeroU32, rc::Rc, time::Durati use ntex::codec::{AsyncRead, AsyncWrite}; use ntex::connect::{self, Address, Connect, Connector}; -use ntex::rt::time::delay_for; use ntex::service::Service; +use ntex::time::{timeout, Seconds}; use ntex::util::{select, ByteString, Bytes, Either}; #[cfg(feature = "openssl")] @@ -21,8 +21,8 @@ pub struct MqttConnector { address: A, connector: T, pkt: codec::Connect, - handshake_timeout: u16, - disconnect_timeout: u16, + handshake_timeout: Seconds, + disconnect_timeout: Seconds, pool: Rc, } @@ -37,8 +37,8 @@ where address, pkt: codec::Connect::default(), connector: Connector::default(), - handshake_timeout: 0, - disconnect_timeout: 3000, + handshake_timeout: Seconds::ZERO, + disconnect_timeout: Seconds(3), pool: Rc::new(MqttSinkPool::default()), } } @@ -71,8 +71,8 @@ where /// A time interval measured in seconds. /// /// keep-alive is set to 30 seconds by default. - pub fn keep_alive(mut self, val: u16) -> Self { - self.pkt.keep_alive = val; + pub fn keep_alive(mut self, val: Seconds) -> Self { + self.pkt.keep_alive = val.seconds() as u16; self } @@ -154,16 +154,16 @@ where self } - /// Set handshake timeout in milliseconds. + /// Set handshake timeout. /// /// Handshake includes `connect` packet and response `connect-ack`. /// By default handshake timeuot is disabled. - pub fn handshake_timeout(mut self, timeout: u16) -> Self { - self.handshake_timeout = timeout as u16; + pub fn handshake_timeout(mut self, timeout: Seconds) -> Self { + self.handshake_timeout = timeout; self } - /// Set client connection disconnect timeout in milliseconds. + /// Set client connection disconnect timeout. /// /// Defines a timeout for disconnect connection. If a disconnect procedure does not complete /// within this time, the connection get dropped. @@ -171,8 +171,8 @@ where /// To disable timeout set value to 0. /// /// By default disconnect timeout is set to 3 seconds. - pub fn disconnect_timeout(mut self, timeout: u16) -> Self { - self.disconnect_timeout = timeout as u16; + pub fn disconnect_timeout(mut self, timeout: Seconds) -> Self { + self.disconnect_timeout = timeout; self } @@ -222,16 +222,12 @@ where /// Connect to mqtt server pub fn connect(&self) -> impl Future, ClientError>> { - if self.handshake_timeout > 0 { - let fut = select( - delay_for(Duration::from_millis(self.handshake_timeout as u64)), - self._connect(), - ); + if self.handshake_timeout.non_zero() { + let fut = timeout(self.handshake_timeout, self._connect()); Either::Left(async move { - let result = fut.await; - match result { - Either::Left(_) => Err(ClientError::HandshakeTimeout), - Either::Right(res) => res.map_err(From::from), + match fut.await { + Ok(res) => res.map_err(From::from), + Err(_) => Err(ClientError::HandshakeTimeout), } }) } else { @@ -285,7 +281,7 @@ where shared, pkt, max_receive, - keep_alive, + Seconds(keep_alive), disconnect_timeout, )) } else { diff --git a/src/v5/selector.rs b/src/v5/selector.rs index ca48dee8..fc0e062e 100644 --- a/src/v5/selector.rs +++ b/src/v5/selector.rs @@ -4,8 +4,8 @@ use std::{ }; use ntex::codec::{AsyncRead, AsyncWrite}; -use ntex::rt::time::{sleep, Sleep}; use ntex::service::{apply_fn_factory, boxed, IntoServiceFactory, Service, ServiceFactory}; +use ntex::time::{sleep, Seconds, Sleep}; use ntex::util::{timeout::Timeout, timeout::TimeoutError, Either, Ready}; use crate::error::{MqttError, ProtocolError}; @@ -18,7 +18,7 @@ use super::publish::{Publish, PublishAck}; use super::shared::{MqttShared, MqttSinkPool}; use super::{codec as mqtt, dispatcher::factory, MqttServer, MqttSink, Session}; -pub(crate) type SelectItem = (Handshake, State, Option>>); +pub(crate) type SelectItem = (Handshake, State, Option); type ServerFactory = boxed::BoxServiceFactory< (), @@ -38,7 +38,7 @@ type Server = pub struct Selector { servers: Vec>, max_size: u32, - handshake_timeout: u16, + handshake_timeout: Seconds, pool: Rc, _t: marker::PhantomData<(Io, Err, InitErr)>, } @@ -49,7 +49,7 @@ impl Selector { Selector { servers: Vec::new(), max_size: 0, - handshake_timeout: 0, + handshake_timeout: Seconds::ZERO, pool: Default::default(), _t: marker::PhantomData, } @@ -62,11 +62,11 @@ where Err: 'static, InitErr: 'static, { - /// Set handshake timeout in millis. + /// Set handshake timeout. /// /// Handshake includes `connect` packet and response `connect-ack`. /// By default handshake timeuot is disabled. - pub fn handshake_timeout(mut self, timeout: u16) -> Self { + pub fn handshake_timeout(mut self, timeout: Seconds) -> Self { self.handshake_timeout = timeout; self } @@ -123,7 +123,7 @@ where self, ) -> impl ServiceFactory< Config = (), - Request = (Io, State, Option>>), + Request = (Io, State, Option), Response = (), Error = MqttError, InitError = InitErr, @@ -170,7 +170,7 @@ where pub struct SelectorService { servers: Rc>>, max_size: u32, - handshake_timeout: u16, + handshake_timeout: Seconds, pool: Rc, } @@ -220,12 +220,8 @@ where 0, self.pool.clone(), )); - let delay = if self.handshake_timeout > 0 { - Some(Box::pin(sleep(time::Duration::from_secs(self.handshake_timeout as u64)))) - } else { - None - }; + let delay = self.handshake_timeout.map(sleep); Box::pin(async move { // read first packet let packet = state @@ -283,7 +279,7 @@ where InitErr: 'static, { type Config = (); - type Request = (Io, State, Option>>); + type Request = (Io, State, Option); type Response = (); type Error = MqttError; type InitError = InitErr; @@ -316,7 +312,7 @@ where Io: AsyncRead + AsyncWrite + Unpin + 'static, Err: 'static, { - type Request = (Io, State, Option>>); + type Request = (Io, State, Option); type Response = (); type Error = MqttError; type Future = Pin>>>>; diff --git a/src/v5/server.rs b/src/v5/server.rs index 343f072a..f2e6a254 100644 --- a/src/v5/server.rs +++ b/src/v5/server.rs @@ -1,14 +1,12 @@ use std::task::{Context, Poll}; -use std::{ - cell::RefCell, convert::TryFrom, fmt, future::Future, marker, pin::Pin, rc::Rc, - time::Duration, -}; +use std::{cell::RefCell, convert::TryFrom, fmt, future::Future, marker, pin::Pin, rc::Rc}; use ntex::codec::{AsyncRead, AsyncWrite}; use ntex::framed::WriteTask; use ntex::service::{IntoServiceFactory, Service, ServiceFactory}; +use ntex::time::{Millis, Seconds, Sleep}; use ntex::util::timeout::{Timeout, TimeoutError}; -use ntex::{rt::time::Sleep, util::Either}; +use ntex::util::Either; use crate::error::{MqttError, ProtocolError}; use crate::io::{DispatchItem, Dispatcher, State, Timer}; @@ -31,8 +29,8 @@ pub struct MqttServer, - handshake_timeout: u16, - disconnect_timeout: u16, + handshake_timeout: Seconds, + disconnect_timeout: Seconds, max_topic_alias: u16, pub(super) pool: Rc, _t: marker::PhantomData<(Io, St)>, @@ -64,8 +62,8 @@ where max_size: 0, max_receive: 15, max_qos: None, - handshake_timeout: 0, - disconnect_timeout: 3000, + handshake_timeout: Seconds::ZERO, + disconnect_timeout: Seconds(3), max_topic_alias: 32, pool: Rc::new(MqttSinkPool::default()), _t: marker::PhantomData, @@ -87,16 +85,16 @@ where > + 'static, P: ServiceFactory, Request = Publish, Response = PublishAck> + 'static, { - /// Set handshake timeout in millis. + /// Set handshake timeout. /// /// Handshake includes `connect` packet and response `connect-ack`. /// By default handshake timeuot is disabled. - pub fn handshake_timeout(mut self, timeout: u16) -> Self { + pub fn handshake_timeout(mut self, timeout: Seconds) -> Self { self.handshake_timeout = timeout; self } - /// Set server connection disconnect timeout in milliseconds. + /// Set server connection disconnect timeout. /// /// Defines a timeout for disconnect connection. If a disconnect procedure does not complete /// within this time, the connection get dropped. @@ -104,7 +102,7 @@ where /// To disable timeout set value to 0. /// /// By default disconnect timeout is set to 3 seconds. - pub fn disconnect_timeout(mut self, val: u16) -> Self { + pub fn disconnect_timeout(mut self, val: Seconds) -> Self { self.disconnect_timeout = val; self } @@ -247,7 +245,7 @@ where self, ) -> impl ServiceFactory< Config = (), - Request = (Io, State, Option>>), + Request = (Io, State, Option), Response = (), Error = MqttError, InitError = C::InitError, @@ -304,7 +302,7 @@ where max_topic_alias: self.max_topic_alias, max_qos: self.max_qos, disconnect_timeout: self.disconnect_timeout, - time: Timer::with(Duration::from_secs(1)), + time: Timer::new(Millis::ONE_SEC), _t: marker::PhantomData, } } @@ -316,12 +314,12 @@ fn handshake_service_factory( max_receive: u16, max_topic_alias: u16, max_qos: Option, - handshake_timeout: u16, + handshake_timeout: Seconds, pool: Rc, ) -> impl ServiceFactory< Config = (), Request = Io, - Response = (Io, State, Rc, Session, u16), + Response = (Io, State, Rc, Session, Seconds), Error = MqttError, > where @@ -330,7 +328,7 @@ where C::Error: fmt::Debug, { ntex::apply( - Timeout::new(Duration::from_millis(handshake_timeout as u64)), + Timeout::new(Millis::from(handshake_timeout)), ntex::fn_factory(move || { let pool = pool.clone(); @@ -366,12 +364,12 @@ fn handshake_service_factory2( max_receive: u16, max_topic_alias: u16, max_qos: Option, - handshake_timeout: u16, + handshake_timeout: Seconds, pool: Rc, ) -> impl ServiceFactory< Config = (), Request = (Io, State), - Response = (Io, State, Rc, Session, u16), + Response = (Io, State, Rc, Session, Seconds), Error = MqttError, InitError = C::InitError, > @@ -381,7 +379,7 @@ where C::Error: fmt::Debug, { ntex::apply( - Timeout::new(Duration::from_millis(handshake_timeout as u64)), + Timeout::new(Millis::from(handshake_timeout)), ntex::fn_factory(move || { let pool = pool.clone(); let fut = factory.new_service(()); @@ -420,7 +418,7 @@ async fn handshake( mut max_topic_alias: u16, max_qos: Option, pool: Rc, -) -> Result<(Io, State, Rc, Session, u16), S::Error> +) -> Result<(Io, State, Rc, Session, Seconds), S::Error> where Io: AsyncRead + AsyncWrite + Unpin + 'static, S: Service, Response = HandshakeAck, Error = MqttError>, @@ -514,7 +512,7 @@ where max_receive, max_topic_alias, ), - ack.keepalive, + Seconds(ack.keepalive), )) } None => { @@ -559,7 +557,7 @@ pub(crate) struct ServerSelector { max_size: u32, max_receive: u16, max_qos: Option, - disconnect_timeout: u16, + disconnect_timeout: Seconds, max_topic_alias: u16, _t: marker::PhantomData<(St, Io, R)>, } @@ -624,7 +622,7 @@ pub(crate) struct ServerSelectorImpl { max_size: u32, max_receive: u16, max_qos: Option, - disconnect_timeout: u16, + disconnect_timeout: Seconds, max_topic_alias: u16, time: Timer, _t: marker::PhantomData<(St, Io, R)>, @@ -764,7 +762,7 @@ where log::trace!("Connection handler is created, starting dispatcher"); Dispatcher::with(ack.io, shared.state.clone(), shared, handler, time) - .keepalive_timeout(ack.keepalive as u16) + .keepalive_timeout(Seconds(ack.keepalive)) .disconnect_timeout(timeout) .await?; Ok(Either::Right(())) diff --git a/tests/test_server.rs b/tests/test_server.rs index 27b11e79..446a35e6 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -3,8 +3,8 @@ use std::{num::NonZeroU16, time::Duration}; use futures::{future::ok, FutureExt, SinkExt, StreamExt}; use ntex::codec::Framed; -use ntex::rt::time::sleep; use ntex::server; +use ntex::time::{sleep, Seconds}; use ntex::util::{poll_fn, ByteString, Bytes}; use ntex_mqtt::v3::{ @@ -18,7 +18,7 @@ async fn handshake(mut packet: Handshake) -> Result packet.packet_mut(); packet.io(); packet.sink(); - Ok(packet.ack(St, false).idle_timeout(16)) + Ok(packet.ack(St, false).idle_timeout(Seconds(16))) } #[ntex::test] diff --git a/tests/test_server_v5.rs b/tests/test_server_v5.rs index 99bcc718..216c7b11 100644 --- a/tests/test_server_v5.rs +++ b/tests/test_server_v5.rs @@ -3,8 +3,8 @@ use std::{convert::TryFrom, num::NonZeroU16, time::Duration}; use futures::{future::ok, FutureExt, SinkExt, StreamExt}; use ntex::codec::Framed; -use ntex::rt::time::sleep; use ntex::server; +use ntex::time::sleep; use ntex::util::{poll_fn, ByteString, Bytes}; use ntex_mqtt::v5::{