diff --git a/CHANGES.md b/CHANGES.md index 67f129a0..5048793a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,10 @@ # Changes +## [0.7.0-b.7] - 2021-08-16 + +* v3: Boxed Packet::Connect to trim down Packet size +* v5: Boxed Packet::Connect and Packet::ConnAck variants to trim down Packet size + ## [0.7.0-b.6] - 2021-07-28 * v3/v5: Fixed nested with_queues calls in sink impl diff --git a/Cargo.toml b/Cargo.toml index c1c6d47f..38007297 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "ntex-mqtt" -version = "0.7.0-b.6" +version = "0.7.0-b.7" authors = ["ntex contributors "] -description = "MQTT Client/Server framework for v5 and v3.1.1 protocols" +description = "Client and Server framework for MQTT v5 and v3.1.1 protocols" documentation = "https://docs.rs/ntex-mqtt" repository = "https://github.com/ntex-rs/ntex-mqtt.git" categories = ["network-programming"] @@ -12,7 +12,7 @@ exclude = [".gitignore", ".travis.yml", ".cargo/config"] edition = "2018" [dependencies] -ntex = { version = "0.4.0-b.1", default-features = false } +ntex = { version = "0.4.0-b.2", default-features = false } bitflags = "1.2" derive_more = "0.99" log = "0.4" @@ -28,4 +28,4 @@ tokio-rustls = "0.22" openssl = "0.10" tokio-openssl = "0.6" -ntex = { version = "0.4.0-b.1", features = ["rustls", "openssl"] } +ntex = { version = "0.4.0-b.2", features = ["rustls", "openssl"] } diff --git a/examples/basic.rs b/examples/basic.rs index 9c8db283..e4239a1a 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -46,6 +46,11 @@ async fn publish_v5(publish: v5::Publish) -> Result #[ntex::main] async fn main() -> std::io::Result<()> { + println!("{}", std::mem::size_of::()); + println!("{}", std::mem::size_of::()); + println!("{}", std::mem::size_of::()); + println!("{}", std::mem::size_of::>()); + println!("{}", std::mem::size_of::>()); std::env::set_var("RUST_LOG", "ntex=trace,ntex_mqtt=trace,basic=trace"); env_logger::init(); diff --git a/src/v3/client/connector.rs b/src/v3/client/connector.rs index 3fec825f..ff9e478e 100644 --- a/src/v3/client/connector.rs +++ b/src/v3/client/connector.rs @@ -256,7 +256,7 @@ where let state = State::new(); let codec = codec::Codec::new().max_size(max_packet_size); - state.send(&mut io, &codec, codec::Packet::Connect(pkt)).await?; + state.send(&mut io, &codec, pkt.into()).await?; let packet = state .next(&mut io, &codec) diff --git a/src/v3/codec/decode.rs b/src/v3/codec/decode.rs index 4d16d05a..9897983d 100644 --- a/src/v3/codec/decode.rs +++ b/src/v3/codec/decode.rs @@ -86,14 +86,15 @@ fn decode_connect_packet(src: &mut Bytes) -> Result { }; let password = if flags.contains(ConnectFlags::PASSWORD) { Some(Bytes::decode(src)?) } else { None }; - Ok(Packet::Connect(Connect { + Ok(Connect { clean_session: flags.contains(ConnectFlags::CLEAN_START), keep_alive, client_id, last_will, username, password, - })) + } + .into()) } fn decode_connect_ack_packet(src: &mut Bytes) -> Result { @@ -187,21 +188,21 @@ mod tests { decode_connect_packet(&mut Bytes::from_static( b"\x00\x04MQTT\x04\xC0\x00\x3C\x00\x0512345\x00\x04user\x00\x04pass" )), - Ok(Packet::Connect(Connect { + Ok(Packet::Connect(Box::new(Connect { clean_session: false, keep_alive: 60, client_id: ByteString::try_from(Bytes::from_static(b"12345")).unwrap(), last_will: None, username: Some(ByteString::try_from(Bytes::from_static(b"user")).unwrap()), password: Some(Bytes::from(&b"pass"[..])), - })) + }))) ); assert_eq!( decode_connect_packet(&mut Bytes::from_static( b"\x00\x04MQTT\x04\x14\x00\x3C\x00\x0512345\x00\x05topic\x00\x07message" )), - Ok(Packet::Connect(Connect { + Ok(Packet::Connect(Box::new(Connect { clean_session: false, keep_alive: 60, client_id: ByteString::try_from(Bytes::from_static(b"12345")).unwrap(), @@ -213,7 +214,7 @@ mod tests { }), username: None, password: None, - })) + }))) ); assert_eq!( diff --git a/src/v3/codec/encode.rs b/src/v3/codec/encode.rs index 155f5fc2..11876eee 100644 --- a/src/v3/codec/encode.rs +++ b/src/v3/codec/encode.rs @@ -9,7 +9,7 @@ use super::packet::*; pub(crate) fn get_encoded_size(packet: &Packet) -> usize { match *packet { Packet::Connect ( ref connect ) => { - let Connect {ref last_will, ref client_id, ref username, ref password, ..} = *connect; + let Connect {ref last_will, ref client_id, ref username, ref password, ..} = **connect; // Protocol Name + Protocol Level + Connect Flags + Keep Alive let mut n = 2 + 4 + 1 + 1 + 2; @@ -264,20 +264,20 @@ mod tests { #[test] fn test_encode_connect_packets() { assert_encode_packet( - &Packet::Connect(Connect { + &Packet::Connect(Box::new(Connect { clean_session: false, keep_alive: 60, client_id: ByteString::from_static("12345"), last_will: None, username: Some(ByteString::from_static("user")), password: Some(Bytes::from_static(b"pass")), - }), + })), &b"\x10\x1D\x00\x04MQTT\x04\xC0\x00\x3C\x00\ \x0512345\x00\x04user\x00\x04pass"[..], ); assert_encode_packet( - &Packet::Connect(Connect { + &Packet::Connect(Box::new(Connect { clean_session: false, keep_alive: 60, client_id: ByteString::from_static("12345"), @@ -289,7 +289,7 @@ mod tests { }), username: None, password: None, - }), + })), &b"\x10\x21\x00\x04MQTT\x04\x14\x00\x3C\x00\ \x0512345\x00\x05topic\x00\x07message"[..], ); diff --git a/src/v3/codec/packet.rs b/src/v3/codec/packet.rs index 0317a990..01458d09 100644 --- a/src/v3/codec/packet.rs +++ b/src/v3/codec/packet.rs @@ -123,7 +123,7 @@ pub enum SubscribeReturnCode { /// MQTT Control Packets pub enum Packet { /// Client request to connect to Server - Connect(Connect), + Connect(Box), /// Connect acknowledgment ConnectAck { @@ -194,7 +194,7 @@ pub enum Packet { impl From for Packet { fn from(val: Connect) -> Packet { - Packet::Connect(val) + Packet::Connect(Box::new(val)) } } diff --git a/src/v3/handshake.rs b/src/v3/handshake.rs index b3a7148c..c8b6a798 100644 --- a/src/v3/handshake.rs +++ b/src/v3/handshake.rs @@ -7,12 +7,12 @@ use super::sink::MqttSink; /// Connect message pub struct Handshake { io: Io, - pkt: mqtt::Connect, + pkt: Box, shared: Rc, } impl Handshake { - pub(crate) fn new(pkt: mqtt::Connect, io: Io, shared: Rc) -> Self { + pub(crate) fn new(pkt: Box, io: Io, shared: Rc) -> Self { Self { io, pkt, shared } } diff --git a/src/v5/client/connection.rs b/src/v5/client/connection.rs index 50969165..a7a8cf78 100644 --- a/src/v5/client/connection.rs +++ b/src/v5/client/connection.rs @@ -23,7 +23,7 @@ pub struct Client { keepalive: u16, disconnect_timeout: u16, max_receive: usize, - pkt: codec::ConnectAck, + pkt: Box, } impl Client @@ -34,7 +34,7 @@ where pub(super) fn new( io: T, shared: Rc, - pkt: codec::ConnectAck, + pkt: Box, max_receive: u16, keepalive: u16, disconnect_timeout: u16, diff --git a/src/v5/client/connector.rs b/src/v5/client/connector.rs index 0b3db830..8832ca16 100644 --- a/src/v5/client/connector.rs +++ b/src/v5/client/connector.rs @@ -253,7 +253,7 @@ where let state = State::new(); let codec = codec::Codec::new().max_inbound_size(max_packet_size); - state.send(&mut io, &codec, codec::Packet::Connect(pkt)).await?; + state.send(&mut io, &codec, codec::Packet::Connect(Box::new(pkt))).await?; let packet = state .next(&mut io, &codec) diff --git a/src/v5/codec/decode.rs b/src/v5/codec/decode.rs index cf178315..ff35a123 100644 --- a/src/v5/codec/decode.rs +++ b/src/v5/codec/decode.rs @@ -17,8 +17,8 @@ pub(super) fn decode_packet(mut src: Bytes, first_byte: u8) -> Result Ok(Packet::SubscribeAck(SubscribeAck::decode(&mut src)?)), packet_type::UNSUBSCRIBE => Ok(Packet::Unsubscribe(Unsubscribe::decode(&mut src)?)), packet_type::UNSUBACK => Ok(Packet::UnsubscribeAck(UnsubscribeAck::decode(&mut src)?)), - packet_type::CONNECT => Ok(Packet::Connect(Connect::decode(&mut src)?)), - packet_type::CONNACK => Ok(Packet::ConnectAck(ConnectAck::decode(&mut src)?)), + packet_type::CONNECT => Ok(Packet::Connect(Box::new(Connect::decode(&mut src)?))), + packet_type::CONNACK => Ok(Packet::ConnectAck(Box::new(ConnectAck::decode(&mut src)?))), packet_type::DISCONNECT => Ok(Packet::Disconnect(Disconnect::decode(&mut src)?)), packet_type::AUTH => Ok(Packet::Auth(Auth::decode(&mut src)?)), packet_type::PUBREC => Ok(Packet::PublishReceived(PublishAck::decode(&mut src)?)), @@ -165,11 +165,11 @@ mod tests { assert_decode_packet( b"\x20\x03\x01\x86\x00", - Packet::ConnectAck(ConnectAck { + Packet::ConnectAck(Box::new(ConnectAck { session_present: true, reason_code: ConnectAckReason::BadUserNameOrPassword, ..ConnectAck::default() - }), + })), ); assert_decode_packet([0b1110_0000, 0], Packet::Disconnect(Disconnect::default())); diff --git a/src/v5/codec/encode.rs b/src/v5/codec/encode.rs index 9ed557d3..e7d96fa3 100644 --- a/src/v5/codec/encode.rs +++ b/src/v5/codec/encode.rs @@ -317,7 +317,7 @@ mod tests { #[test] fn test_encode_connect_packets() { assert_encode_packet( - &Packet::Connect(Connect { + &Packet::Connect(Box::new(Connect { clean_start: false, keep_alive: 60, client_id: ByteString::from_static("12345"), @@ -333,13 +333,13 @@ mod tests { topic_alias_max: 0, user_properties: vec![], max_packet_size: None, - }), + })), &b"\x10\x1E\x00\x04MQTT\x05\xC0\x00\x3C\x00\x00\ \x0512345\x00\x04user\x00\x04pass"[..], ); assert_encode_packet( - &Packet::Connect(Connect { + &Packet::Connect(Box::new(Connect { clean_start: false, keep_alive: 60, client_id: ByteString::from_static("12345"), @@ -367,7 +367,7 @@ mod tests { topic_alias_max: 0, user_properties: vec![], max_packet_size: None, - }), + })), &b"\x10\x23\x00\x04MQTT\x05\x14\x00\x3C\x00\x00\ \x0512345\x00\x00\x05topic\x00\x07message"[..], ); diff --git a/src/v5/codec/packet/mod.rs b/src/v5/codec/packet/mod.rs index 7180ee0a..d973dbf1 100644 --- a/src/v5/codec/packet/mod.rs +++ b/src/v5/codec/packet/mod.rs @@ -28,9 +28,9 @@ pub use subscribe::*; /// MQTT Control Packets pub enum Packet { /// Client request to connect to Server - Connect(Connect), + Connect(Box), /// Connect acknowledgment - ConnectAck(ConnectAck), + ConnectAck(Box), /// Publish message Publish(Publish), /// Publish acknowledgment diff --git a/src/v5/error.rs b/src/v5/error.rs index 331f63b3..f4751be7 100644 --- a/src/v5/error.rs +++ b/src/v5/error.rs @@ -9,7 +9,7 @@ pub use crate::v5::codec; pub enum ClientError { /// Connect negotiation failed #[display(fmt = "Connect ack failed: {:?}", _0)] - Ack(codec::ConnectAck), + Ack(Box), /// Protocol error #[display(fmt = "Protocol error: {:?}", _0)] Protocol(ProtocolError), diff --git a/src/v5/handshake.rs b/src/v5/handshake.rs index 4cdd1afb..892ddfa8 100644 --- a/src/v5/handshake.rs +++ b/src/v5/handshake.rs @@ -5,7 +5,7 @@ use super::{codec, shared::MqttShared, sink::MqttSink}; /// Handshake message pub struct Handshake { io: Io, - pkt: codec::Connect, + pkt: Box, pub(super) shared: Rc, pub(super) max_size: u32, pub(super) max_receive: u16, @@ -14,7 +14,7 @@ pub struct Handshake { impl Handshake { pub(crate) fn new( - pkt: codec::Connect, + pkt: Box, io: Io, shared: Rc, max_size: u32, diff --git a/src/v5/server.rs b/src/v5/server.rs index d7f673e3..343f072a 100644 --- a/src/v5/server.rs +++ b/src/v5/server.rs @@ -497,7 +497,11 @@ where state.set_buffer_params(ack.read_hw, ack.write_hw, ack.lw); state - .send(&mut ack.io, &shared.codec, mqtt::Packet::ConnectAck(ack.packet)) + .send( + &mut ack.io, + &shared.codec, + mqtt::Packet::ConnectAck(Box::new(ack.packet)), + ) .await?; Ok(( @@ -521,7 +525,10 @@ where .shared .state .write() - .encode(mqtt::Packet::ConnectAck(ack.packet), &ack.shared.codec) + .encode( + mqtt::Packet::ConnectAck(Box::new(ack.packet)), + &ack.shared.codec, + ) .is_ok() { WriteTask::shutdown( @@ -743,7 +750,7 @@ where .send( &mut ack.io, &shared.codec, - mqtt::Packet::ConnectAck(ack.packet), + mqtt::Packet::ConnectAck(Box::new(ack.packet)), ) .await?; @@ -770,7 +777,10 @@ where .shared .state .write() - .encode(mqtt::Packet::ConnectAck(ack.packet), &ack.shared.codec) + .encode( + mqtt::Packet::ConnectAck(Box::new(ack.packet)), + &ack.shared.codec, + ) .is_ok() { WriteTask::shutdown( diff --git a/tests/test_server.rs b/tests/test_server.rs index 3080c0a6..27b11e79 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -123,7 +123,7 @@ async fn test_ping() -> std::io::Result<()> { let io = srv.connect().await.unwrap(); let mut framed = Framed::new(io, codec::Codec::default()); framed - .send(codec::Packet::Connect(codec::Connect::default().client_id("user"))) + .send(codec::Packet::Connect(codec::Connect::default().client_id("user").into())) .await .unwrap(); framed.next().await.unwrap().unwrap(); @@ -157,10 +157,7 @@ async fn test_ack_order() -> std::io::Result<()> { let io = srv.connect().await.unwrap(); let mut framed = Framed::new(io, codec::Codec::default()); - framed - .send(codec::Packet::Connect(codec::Connect::default().client_id("user"))) - .await - .unwrap(); + framed.send(codec::Connect::default().client_id("user").into()).await.unwrap(); let _ = framed.next().await.unwrap().unwrap(); framed @@ -306,7 +303,7 @@ async fn test_handle_incoming() -> std::io::Result<()> { let io = srv.connect().await.unwrap(); let mut framed = Framed::new(io, codec::Codec::default()); - framed.write(codec::Packet::Connect(codec::Connect::default().client_id("user"))).unwrap(); + framed.write(codec::Connect::default().client_id("user").into()).unwrap(); framed .write( codec::Publish { diff --git a/tests/test_server_v5.rs b/tests/test_server_v5.rs index fbd6fe2b..99bcc718 100644 --- a/tests/test_server_v5.rs +++ b/tests/test_server_v5.rs @@ -160,7 +160,7 @@ async fn test_ping() -> std::io::Result<()> { let io = srv.connect().await.unwrap(); let mut framed = Framed::new(io, codec::Codec::new()); framed - .send(codec::Packet::Connect(codec::Connect::default().client_id("user"))) + .send(codec::Packet::Connect(Box::new(codec::Connect::default().client_id("user")))) .await .unwrap(); let _ = framed.next().await.unwrap().unwrap(); @@ -197,7 +197,7 @@ async fn test_ack_order() -> std::io::Result<()> { let io = srv.connect().await.unwrap(); let mut framed = Framed::new(io, codec::Codec::default()); framed - .send(codec::Packet::Connect(codec::Connect::default().client_id("user"))) + .send(codec::Packet::Connect(Box::new(codec::Connect::default().client_id("user")))) .await .unwrap(); let _ = framed.next().await.unwrap().unwrap(); @@ -268,9 +268,9 @@ async fn test_dups() { let io = srv.connect().await.unwrap(); let mut framed = Framed::new(io, codec::Codec::default()); framed - .send(codec::Packet::Connect( + .send(codec::Packet::Connect(Box::new( codec::Connect::default().client_id("user").receive_max(2), - )) + ))) .await .unwrap(); let _ = framed.next().await.unwrap().unwrap(); @@ -385,19 +385,19 @@ async fn test_max_receive() { let mut framed = Framed::new(io, codec::Codec::default()); framed - .send(codec::Packet::Connect(codec::Connect::default().client_id("user"))) + .send(codec::Packet::Connect(Box::new(codec::Connect::default().client_id("user")))) .await .unwrap(); let ack = framed.next().await.unwrap().unwrap(); assert_eq!( ack, - codec::Packet::ConnectAck(codec::ConnectAck { + codec::Packet::ConnectAck(Box::new(codec::ConnectAck { receive_max: Some(NonZeroU16::new(1).unwrap()), max_qos: Some(codec::QoS::AtLeastOnce), reason_code: codec::ConnectAckReason::Success, topic_alias_max: 32, ..Default::default() - }) + })) ); framed @@ -657,7 +657,7 @@ async fn test_suback_with_reason() -> std::io::Result<()> { let io = srv.connect().await.unwrap(); let mut framed = Framed::new(io, codec::Codec::new()); framed - .send(codec::Packet::Connect(codec::Connect::default().client_id("user"))) + .send(codec::Packet::Connect(Box::new(codec::Connect::default().client_id("user")))) .await .unwrap(); let _ = framed.next().await.unwrap().unwrap(); @@ -720,7 +720,9 @@ async fn test_handle_incoming() -> std::io::Result<()> { let io = srv.connect().await.unwrap(); let mut framed = Framed::new(io, codec::Codec::default()); - framed.write(codec::Packet::Connect(codec::Connect::default().client_id("user"))).unwrap(); + framed + .write(codec::Packet::Connect(Box::new(codec::Connect::default().client_id("user")))) + .unwrap(); framed.write(pkt_publish().into()).unwrap(); framed .write(codec::Packet::Disconnect(codec::Disconnect {