Skip to content

Commit

Permalink
Remove inflight limit for client control service (#110)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Aug 22, 2022
1 parent b2d6f14 commit 55afff5
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 19 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

* Allow to get inner io stream and codec for negotiated clients

* Remove inflight limit for client's control service

* v3: Add Debug trait for client's ControlMessage

## [0.8.7] - 2022-06-09

* v5: Encoding missing will properties: will_delay_interval_sec, is_utf8_payload, message_expiry_interval, content_type, response_topic, correlation_data, user_properties
Expand Down
2 changes: 2 additions & 0 deletions src/v3/client/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub use crate::v3::control::{
};
use crate::v3::{codec, control::ControlResultKind, error};

#[derive(Debug)]
pub enum ControlMessage<E> {
/// Unhandled publish packet
Publish(Publish),
Expand Down Expand Up @@ -50,6 +51,7 @@ impl<E> ControlMessage<E> {
}
}

#[derive(Debug)]
pub struct Publish(codec::Publish);

impl Publish {
Expand Down
15 changes: 5 additions & 10 deletions src/v3/client/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{future::Future, marker::PhantomData, num::NonZeroU16, pin::Pin, rc::Rc

use ntex::io::DispatchItem;
use ntex::service::Service;
use ntex::util::{buffer::BufferService, inflight::InFlightService, Either, HashSet, Ready};
use ntex::util::{inflight::InFlightService, Either, HashSet, Ready};

use crate::v3::shared::{Ack, MqttShared};
use crate::v3::{codec, control::ControlResultKind, publish::Publish, sink::MqttSink};
Expand All @@ -24,16 +24,11 @@ where
T: Service<Publish, Response = Either<(), Publish>, Error = E> + 'static,
C: Service<ControlMessage<E>, Response = ControlResult, Error = E> + 'static,
{
// limit inflight control messages
let control = BufferService::new(
16,
|| MqttError::<E>::Disconnected(None),
// limit number of in-flight messages
InFlightService::new(1, control.map_err(MqttError::Service)),
);

// limit number of in-flight messages
InFlightService::new(inflight, Dispatcher::new(sink, publish, control))
InFlightService::new(
inflight,
Dispatcher::new(sink, publish, control.map_err(MqttError::Service)),
)
}

/// Mqtt protocol dispatcher
Expand Down
17 changes: 8 additions & 9 deletions src/v5/client/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{future::Future, marker::PhantomData, num::NonZeroU16, pin::Pin, rc::Rc

use ntex::io::DispatchItem;
use ntex::service::Service;
use ntex::util::{buffer::BufferService, inflight::InFlightService, Either, HashSet, Ready};
use ntex::util::{Either, HashSet, Ready};

use crate::error::{MqttError, ProtocolError};
use crate::types::packet_type;
Expand All @@ -26,14 +26,13 @@ where
T: Service<Publish, Response = Either<Publish, PublishAck>, Error = E> + 'static,
C: Service<ControlMessage<E>, Response = ControlResult, Error = E> + 'static,
{
let control = BufferService::new(
16,
|| MqttError::<C::Error>::Disconnected(None),
// limit number of in-flight messages
InFlightService::new(1, control.map_err(MqttError::Service)),
);

Dispatcher::<_, _, E>::new(sink, max_receive as usize, max_topic_alias, publish, control)
Dispatcher::<_, _, E>::new(
sink,
max_receive as usize,
max_topic_alias,
publish,
control.map_err(MqttError::Service),
)
}

/// Mqtt protocol dispatcher
Expand Down

0 comments on commit 55afff5

Please sign in to comment.