Skip to content

Commit

Permalink
Do not poll service for readiness if it failed before
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Oct 20, 2021
1 parent 1fcd7d3 commit 03d02f1
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 7 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.7.3] - 2021-10-20

* Do not poll service for readiness if it failed before

## [0.7.2] - 2021-10-01

* Serialize control message handling
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "0.7.2"
version = "0.7.3"
authors = ["ntex contributors <[email protected]>"]
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
Expand All @@ -12,7 +12,7 @@ exclude = [".gitignore", ".travis.yml", ".cargo/config"]
edition = "2018"

[dependencies]
ntex = { version = "0.4.0", default-features = false }
ntex = { version = "0.4.5", default-features = false }
bitflags = "1.3"
derive_more = "0.99"
log = "0.4"
Expand All @@ -28,4 +28,4 @@ tokio-rustls = "0.22"
openssl = "0.10"
tokio-openssl = "0.6"

ntex = { version = "0.4.0", features = ["rustls", "openssl"] }
ntex = { version = "0.4", features = ["rustls", "openssl"] }
69 changes: 65 additions & 4 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ where
*this.st = IoDispatcherState::Stop;
this.inner.borrow_mut().error =
Some(IoDispatcherError::Service(err));
this.state.dispatcher_stopped();
this.state.dispatcher_ready_err();

// unregister keep-alive timer
if this.keepalive_timeout.non_zero() {
Expand All @@ -453,7 +453,9 @@ where
// drain service responses
IoDispatcherState::Stop => {
// service may relay on poll_ready for response results
let _ = this.service.poll_ready(cx);
if !this.state.is_dispatcher_ready_err() {
let _ = this.service.poll_ready(cx);
}

if this.inner.borrow().queue.is_empty() {
this.state.shutdown_io();
Expand Down Expand Up @@ -490,11 +492,13 @@ where

#[cfg(test)]
mod tests {
use std::cell::Cell;

use ntex::channel::condition::Condition;
use ntex::codec::BytesCodec;
use ntex::testing::Io;
use ntex::time::{sleep, Millis};
use ntex::util::Bytes;
use ntex::util::{Bytes, Ready};

use super::*;

Expand Down Expand Up @@ -555,7 +559,7 @@ mod tests {
BytesCodec,
State::new(),
ntex::service::fn_service(|msg: DispatchItem<BytesCodec>| async move {
sleep(time::Duration::from_millis(50)).await;
sleep(Millis(50)).await;
if let DispatchItem::Item(msg) = msg {
Ok::<_, ()>(Some(msg.freeze()))
} else {
Expand All @@ -566,6 +570,11 @@ mod tests {
ntex::rt::spawn(async move {
let _ = disp.await;
});
sleep(Millis(25)).await;
client.write("GET /test HTTP/1\r\n\r\n");

let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"));

let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"));
Expand Down Expand Up @@ -688,4 +697,56 @@ mod tests {
client.close().await;
assert!(client.is_server_dropped());
}

#[ntex::test]
async fn test_err_in_service_ready() {
let (client, server) = Io::create();
client.remote_buffer_cap(0);
client.write("GET /test HTTP/1\r\n\r\n");

let counter = Rc::new(Cell::new(0));

struct Srv(Rc<Cell<usize>>);

impl Service for Srv {
type Request = DispatchItem<BytesCodec>;
type Response = Option<Response<BytesCodec>>;
type Error = ();
type Future = Ready<Option<Response<BytesCodec>>, ()>;

fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), ()>> {
self.0.set(self.0.get() + 1);
Poll::Ready(Err(()))
}

fn call(&self, _: DispatchItem<BytesCodec>) -> Self::Future {
Ready::Ok(None)
}
}

let state = State::new();
let disp = Dispatcher::new(server, BytesCodec, state.clone(), Srv(counter.clone()));
state
.write()
.encode(Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"), &mut BytesCodec)
.unwrap();
ntex::rt::spawn(async move {
let _ = disp.await;
});

// buffer should be flushed
client.remote_buffer_cap(1024);
let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"));

// write side must be closed, dispatcher waiting for read side to close
assert!(client.is_closed());

// close read side
client.close().await;
assert!(client.is_server_dropped());

// service must be checked for readiness only once
assert_eq!(counter.get(), 1);
}
}

0 comments on commit 03d02f1

Please sign in to comment.