Skip to content

Commit

Permalink
use new ntex timer api
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Aug 28, 2021
1 parent 630cba3 commit c53a2be
Show file tree
Hide file tree
Showing 17 changed files with 247 additions and 288 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.7.0-b.8] - 2021-08-28

* use new ntex's timer api

## [0.7.0-b.7] - 2021-08-16

* v3: Boxed Packet::Connect to trim down Packet size
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "0.7.0-b.7"
version = "0.7.0-b.8"
authors = ["ntex contributors <[email protected]>"]
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
Expand All @@ -12,7 +12,7 @@ exclude = [".gitignore", ".travis.yml", ".cargo/config"]
edition = "2018"

[dependencies]
ntex = { version = "0.4.0-b.2", default-features = false }
ntex = { version = "0.4.0-b.4", default-features = false }
bitflags = "1.2"
derive_more = "0.99"
log = "0.4"
Expand All @@ -28,4 +28,4 @@ tokio-rustls = "0.22"
openssl = "0.10"
tokio-openssl = "0.6"

ntex = { version = "0.4.0-b.2", features = ["rustls", "openssl"] }
ntex = { version = "0.4.0-b.4", features = ["rustls", "openssl"] }
10 changes: 4 additions & 6 deletions examples/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::time::Duration;

use ntex::rt::time::delay_for;
use ntex::time::{sleep, Seconds};
use ntex_mqtt::v5;

#[derive(Debug)]
Expand Down Expand Up @@ -32,7 +30,7 @@ async fn main() -> std::io::Result<()> {
// connect to server
let client = v5::client::MqttConnector::new("127.0.0.1:1883")
.client_id("user")
.keep_alive(1)
.keep_alive(Seconds::ONE)
.connect()
.await
.unwrap();
Expand All @@ -56,11 +54,11 @@ async fn main() -> std::io::Result<()> {
.await
.unwrap();

delay_for(Duration::from_secs(10)).await;
sleep(10_000).await;

log::info!("closing connection");
sink.close();
delay_for(Duration::from_secs(1)).await;
sleep(1_000).await;

Ok(())
}
61 changes: 28 additions & 33 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub(crate) use ntex::framed::{DispatchItem, ReadTask, State, Timer, Write, Write

use ntex::codec::{AsyncRead, AsyncWrite, Decoder, Encoder};
use ntex::service::{IntoService, Service};
use ntex::util::Either;
use ntex::{time::Seconds, util::Either};

type Response<U> = <U as Encoder>::Item;

Expand All @@ -28,7 +28,7 @@ pin_project_lite::pin_project! {
st: IoDispatcherState,
timer: Timer,
updated: time::Instant,
keepalive_timeout: u16,
keepalive_timeout: Seconds,
#[pin]
response: Option<S::Future>,
response_idx: usize,
Expand Down Expand Up @@ -119,11 +119,11 @@ where
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
let updated = timer.now();
let keepalive_timeout: u16 = 30;
let keepalive_timeout = Seconds(30);
let io = Rc::new(RefCell::new(io));

// register keepalive timer
let expire = updated + time::Duration::from_secs(keepalive_timeout as u64);
let expire = updated + time::Duration::from(keepalive_timeout);
timer.register(expire, expire, &state);

let inner = Rc::new(RefCell::new(DispatcherState {
Expand Down Expand Up @@ -155,30 +155,29 @@ where
/// To disable timeout set value to 0.
///
/// By default keep-alive timeout is set to 30 seconds.
pub(crate) fn keepalive_timeout(mut self, timeout: u16) -> Self {
pub(crate) fn keepalive_timeout(mut self, timeout: Seconds) -> Self {
// register keepalive timer
let prev = self.updated + time::Duration::from_secs(self.keepalive_timeout as u64);
if timeout == 0 {
let prev = self.updated + time::Duration::from(self.keepalive_timeout);
if timeout.is_zero() {
self.timer.unregister(prev, &self.state);
} else {
let expire = self.updated + time::Duration::from_secs(timeout as u64);
let expire = self.updated + time::Duration::from(timeout);
self.timer.register(expire, prev, &self.state);
}

self.keepalive_timeout = timeout;

self
}

/// Set connection disconnect timeout in milliseconds.
/// Set connection disconnect timeout.
///
/// Defines a timeout for disconnect connection. If a disconnect procedure does not complete
/// within this time, the connection get dropped.
///
/// To disable timeout set value to 0.
///
/// By default disconnect timeout is set to 1 seconds.
pub(crate) fn disconnect_timeout(self, val: u16) -> Self {
pub(crate) fn disconnect_timeout(self, val: Seconds) -> Self {
self.state.set_disconnect_timeout(val);
self
}
Expand Down Expand Up @@ -287,12 +286,10 @@ where
let mut inner = this.inner.borrow_mut();

// unregister keep-alive timer
if *this.keepalive_timeout != 0 {
if this.keepalive_timeout.non_zero() {
this.timer.unregister(
*this.updated
+ time::Duration::from_secs(
*this.keepalive_timeout as u64,
),
+ time::Duration::from(*this.keepalive_timeout),
this.state,
);
}
Expand Down Expand Up @@ -322,11 +319,11 @@ where
match read.decode(this.codec) {
Ok(Some(el)) => {
// update keep-alive timer
if *this.keepalive_timeout != 0 {
if this.keepalive_timeout.non_zero() {
let updated = this.timer.now();
if updated != *this.updated {
let ka = time::Duration::from_secs(
*this.keepalive_timeout as u64,
let ka = time::Duration::from(
*this.keepalive_timeout,
);
this.timer.register(
updated + ka,
Expand All @@ -349,11 +346,11 @@ where
*this.st = IoDispatcherState::Stop;

// unregister keep-alive timer
if *this.keepalive_timeout != 0 {
if this.keepalive_timeout.non_zero() {
this.timer.unregister(
*this.updated
+ time::Duration::from_secs(
*this.keepalive_timeout as u64,
+ time::Duration::from(
*this.keepalive_timeout,
),
this.state,
);
Expand Down Expand Up @@ -440,12 +437,10 @@ where
this.state.dispatcher_stopped();

// unregister keep-alive timer
if *this.keepalive_timeout != 0 {
if this.keepalive_timeout.non_zero() {
this.timer.unregister(
*this.updated
+ time::Duration::from_secs(
*this.keepalive_timeout as u64,
),
+ time::Duration::from(*this.keepalive_timeout),
this.state,
);
}
Expand Down Expand Up @@ -497,8 +492,8 @@ where
mod tests {
use ntex::channel::condition::Condition;
use ntex::codec::BytesCodec;
use ntex::rt::time::sleep;
use ntex::testing::Io;
use ntex::time::{sleep, Millis};
use ntex::util::Bytes;

use super::*;
Expand All @@ -521,8 +516,8 @@ mod tests {
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
let timer = Timer::with(time::Duration::from_secs(1));
let keepalive_timeout = 30;
let timer = Timer::new(Millis::ONE_SEC);
let keepalive_timeout = Seconds(30);
let updated = timer.now();
let io = Rc::new(RefCell::new(io));
ntex::rt::spawn(ReadTask::new(io.clone(), state.clone()));
Expand Down Expand Up @@ -607,12 +602,12 @@ mod tests {
ntex::rt::spawn(async move {
let _ = disp.await;
});
sleep(time::Duration::from_millis(50)).await;
sleep(50).await;

client.write("test");
sleep(time::Duration::from_millis(50)).await;
sleep(50).await;
client.write("test");
sleep(time::Duration::from_millis(50)).await;
sleep(50).await;
condition.notify();

let buf = client.read().await.unwrap();
Expand Down Expand Up @@ -642,7 +637,7 @@ mod tests {
}),
);
ntex::rt::spawn(async move {
let _ = disp.disconnect_timeout(25).await;
let _ = disp.disconnect_timeout(Seconds(1)).await;
});

let buf = client.read().await.unwrap();
Expand All @@ -653,7 +648,7 @@ mod tests {
assert_eq!(buf, Bytes::from_static(b"test"));

st.close();
sleep(time::Duration::from_millis(200)).await;
sleep(1200).await;
assert!(client.is_server_dropped());
}

Expand Down
Loading

0 comments on commit c53a2be

Please sign in to comment.