Skip to content

Commit

Permalink
Upgrade to ntex 0.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Dec 30, 2021
1 parent d58183d commit 8f1a184
Show file tree
Hide file tree
Showing 18 changed files with 90 additions and 126 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- uses: actions-rs/clippy-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}
args: --all-features
args: --all --features=ntex/tokio

fmt:
name: Rustfmt
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
timeout-minutes: 40
with:
command: test
args: --all --all-features --no-fail-fast -- --nocapture
args: --all --features=ntex/tokio -- --nocapture

- name: Install tarpaulin
if: matrix.version == '1.53.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
Expand All @@ -72,7 +72,7 @@ jobs:
if: matrix.version == '1.53.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
continue-on-error: true
run: |
cargo tarpaulin --out Xml --all --all-features
cargo tarpaulin --out Xml --all --features=ntex/tokio
- name: Upload to Codecov
if: matrix.version == '1.53.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/osx.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: test
args: --all --all-features --no-fail-fast -- --nocapture
args: --all --features=ntex/tokio -- --nocapture

- name: Clear the cargo caches
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,4 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: test
args: --all --all-features -- --nocapture
args: --all --features=ntex/tokio -- --nocapture
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.8.0] - 2021-12-30

* Upgrade to ntex 0.5.0

## [0.8.0-b.6] - 2021-12-30

* Update to ntex-io 0.1.0-b.10
Expand Down
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "0.8.0-b.6"
version = "0.8.0"
authors = ["ntex contributors <[email protected]>"]
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
Expand All @@ -12,7 +12,7 @@ exclude = [".gitignore", ".travis.yml", ".cargo/config"]
edition = "2018"

[dependencies]
ntex = { version = "0.5.0-b.7", default-features = false }
ntex = { version = "0.5.0", default-features = false }
bitflags = "1.3"
derive_more = "0.99"
log = "0.4"
Expand All @@ -23,9 +23,9 @@ pin-project-lite = "0.2"
[dev-dependencies]
env_logger = "0.9"
futures = "0.3"
ntex-tls = "0.1.0-b.7"
ntex-tls = "0.1.0"
rustls = "0.20"
rustls-pemfile = "0.2"
openssl = "0.10"

ntex = { version = "0.5.0-b.7", features = ["rustls", "openssl"] }
ntex = { version = "0.5.0", features = ["rustls", "openssl"] }
2 changes: 1 addition & 1 deletion examples/subs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::cell::RefCell;

use ntex::io::seal;
use ntex::io::utils::seal;
use ntex::service::{fn_factory_with_config, fn_service, ServiceFactory};
use ntex::util::{ByteString, Ready};
use ntex_mqtt::v5::{
Expand Down
100 changes: 47 additions & 53 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use std::{
};

use ntex::codec::{Decoder, Encoder};
use ntex::io::{DispatchItem, IoBoxed, IoRef, RecvError, Timer};
use ntex::io::{DispatchItem, IoBoxed, IoRef, IoStatusUpdate, RecvError};
use ntex::service::{IntoService, Service};
use ntex::time::{now, Seconds};
use ntex::time::Seconds;
use ntex::util::{ready, Pool};

type Response<U> = <U as Encoder>::Item;
Expand All @@ -28,19 +28,24 @@ pin_project_lite::pin_project! {
state: Rc<RefCell<DispatcherState<S, U>>>,
inner: DispatcherInner,
st: IoDispatcherState,
ready_err: bool,
flags: Cell<Flags>,
pool: Pool,
#[pin]
response: Option<S::Future>,
response_idx: usize,
}
}

bitflags::bitflags! {
struct Flags: u8 {
const READY_ERR = 0b0001;
const IO_ERR = 0b0010;
}
}

struct DispatcherInner {
io: IoBoxed,
timer: Timer,
updated: Cell<time::Instant>,
keepalive_timeout: Cell<Seconds>,
keepalive_timeout: Cell<time::Duration>,
}

struct DispatcherState<S: Service<DispatchItem<U>>, U: Encoder + Decoder> {
Expand Down Expand Up @@ -83,6 +88,12 @@ impl<S, U> From<S> for IoDispatcherError<S, U> {
}
}

fn insert_flags(flags: &mut Cell<Flags>, f: Flags) {
let mut val = flags.get();
val.insert(f);
flags.set(val);
}

impl<S, U> Dispatcher<S, U>
where
S: Service<DispatchItem<U>, Response = Option<Response<U>>> + 'static,
Expand All @@ -94,14 +105,11 @@ where
io: IoBoxed,
codec: U,
service: F,
timer: Timer,
) -> Self {
let updated = Cell::new(now());
let keepalive_timeout = Cell::new(Seconds(30));
let keepalive_timeout = Cell::new(Seconds(30).into());

// register keepalive timer
let expire = updated.get() + time::Duration::from(keepalive_timeout.get());
timer.register(expire, expire, io.as_ref());
io.start_keepalive_timer(keepalive_timeout.get());

let state = Rc::new(RefCell::new(DispatcherState {
error: None,
Expand All @@ -118,8 +126,8 @@ where
service: service.into_service(),
response: None,
response_idx: 0,
ready_err: false,
inner: DispatcherInner { io, timer, updated, keepalive_timeout },
flags: Cell::new(Flags::empty()),
inner: DispatcherInner { io, keepalive_timeout },
}
}

Expand All @@ -129,16 +137,8 @@ where
///
/// By default keep-alive timeout is set to 30 seconds.
pub(crate) fn keepalive_timeout(self, timeout: Seconds) -> Self {
// register keepalive timer
let prev =
self.inner.updated.get() + time::Duration::from(self.inner.keepalive_timeout.get());
if timeout.is_zero() {
self.inner.timer.unregister(prev, self.inner.io.as_ref());
} else {
let expire = self.inner.updated.get() + time::Duration::from(timeout);
self.inner.timer.register(expire, prev, self.inner.io.as_ref());
}

let timeout = timeout.into();
self.inner.io.start_keepalive_timer(timeout);
self.inner.keepalive_timeout.set(timeout);
self
}
Expand All @@ -160,26 +160,13 @@ where
impl DispatcherInner {
fn update_keepalive(&self) {
// update keep-alive timer
let ka = self.keepalive_timeout.get();
if ka.non_zero() {
let updated = now();
if updated != self.updated.get() {
let ka = time::Duration::from(ka);
self.timer.register(updated + ka, self.updated.get() + ka, self.io.as_ref());
self.updated.set(updated);
}
}
self.io.start_keepalive_timer(self.keepalive_timeout.get());
}

fn unregister_keepalive(&self) {
// unregister keep-alive timer
if self.keepalive_timeout.get().non_zero() {
self.keepalive_timeout.set(Seconds::ZERO);
self.timer.unregister(
self.updated.get() + time::Duration::from(self.keepalive_timeout.get()),
self.io.as_ref(),
);
}
self.io.remove_keepalive_timer();
self.keepalive_timeout.set(time::Duration::ZERO);
}
}

Expand Down Expand Up @@ -399,7 +386,7 @@ where
*this.st = IoDispatcherState::Stop;
this.state.borrow_mut().error =
Some(IoDispatcherError::Service(err));
*this.ready_err = true;
insert_flags(&mut *this.flags, Flags::READY_ERR);
}
}
}
Expand All @@ -408,7 +395,7 @@ where
this.inner.unregister_keepalive();

// service may relay on poll_ready for response results
if !*this.ready_err {
if !this.flags.get().contains(Flags::READY_ERR) {
let _ = this.service.poll_ready(cx);
}

Expand All @@ -418,8 +405,21 @@ where
*this.st = IoDispatcherState::Shutdown;
continue;
}
} else {
io.register_dispatcher(cx);
} else if !this.flags.get().contains(Flags::IO_ERR) {
match ready!(this.inner.io.poll_status_update(cx)) {
IoStatusUpdate::PeerGone(_)
| IoStatusUpdate::Stop
| IoStatusUpdate::KeepAlive => {
insert_flags(&mut *this.flags, Flags::IO_ERR);
continue;
}
IoStatusUpdate::WriteBackpressure => {
if ready!(this.inner.io.poll_flush(cx, true)).is_err() {
insert_flags(&mut *this.flags, Flags::IO_ERR);
}
continue;
}
}
}
return Poll::Pending;
}
Expand Down Expand Up @@ -475,10 +475,9 @@ mod tests {
codec: U,
service: F,
) -> (Self, nio::IoRef) {
let timer = Timer::new(Millis::ONE_SEC);
let keepalive_timeout = Cell::new(Seconds(30));
let updated = Cell::new(now());
let keepalive_timeout = Cell::new(Seconds(30).into());
let io = nio::Io::new(io).seal();
io.start_keepalive_timer(keepalive_timeout.get());
let rio = io.get_ref();

let state = Rc::new(RefCell::new(DispatcherState {
Expand All @@ -496,13 +495,8 @@ mod tests {
response: None,
response_idx: 0,
pool: io.memory_pool().pool(),
ready_err: false,
inner: DispatcherInner {
timer,
updated,
keepalive_timeout,
io: IoBoxed::from(io),
},
flags: Cell::new(Flags::empty()),
inner: DispatcherInner { keepalive_timeout, io: IoBoxed::from(io) },
},
rio,
)
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![deny(rust_2018_idioms, warnings)]
#![allow(clippy::type_complexity)]
#![allow(clippy::type_complexity, clippy::return_self_not_must_use)]

//! MQTT Client/Server framework
Expand Down
Loading

0 comments on commit 8f1a184

Please sign in to comment.