Skip to content

Commit

Permalink
Box CONNECT/CONNACK in codecs to cut packet size (#60)
Browse files Browse the repository at this point in the history
* Box CONNECT/CONNACK in codecs to cut packet size

- Box v3::codec::Packet::Connect
- Box v5::codec::Packet::Connect and ConnAck

Cuts down struct sizes:
v3:
Packet: 184 -> 80
Handshake: 216 -> 48

v5:
Packet: 416 -> 256
Handshake: 456 -> 56

* upd CHANGES.md and version

* fmt
  • Loading branch information
nayato authored Aug 17, 2021
1 parent 58c29f1 commit 630cba3
Show file tree
Hide file tree
Showing 18 changed files with 75 additions and 55 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changes

## [0.7.0-b.7] - 2021-08-16

* v3: Boxed Packet::Connect to trim down Packet size
* v5: Boxed Packet::Connect and Packet::ConnAck variants to trim down Packet size

## [0.7.0-b.6] - 2021-07-28

* v3/v5: Fixed nested with_queues calls in sink impl
Expand Down
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[package]
name = "ntex-mqtt"
version = "0.7.0-b.6"
version = "0.7.0-b.7"
authors = ["ntex contributors <[email protected]>"]
description = "MQTT Client/Server framework for v5 and v3.1.1 protocols"
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
repository = "https://github.com/ntex-rs/ntex-mqtt.git"
categories = ["network-programming"]
Expand All @@ -12,7 +12,7 @@ exclude = [".gitignore", ".travis.yml", ".cargo/config"]
edition = "2018"

[dependencies]
ntex = { version = "0.4.0-b.1", default-features = false }
ntex = { version = "0.4.0-b.2", default-features = false }
bitflags = "1.2"
derive_more = "0.99"
log = "0.4"
Expand All @@ -28,4 +28,4 @@ tokio-rustls = "0.22"
openssl = "0.10"
tokio-openssl = "0.6"

ntex = { version = "0.4.0-b.1", features = ["rustls", "openssl"] }
ntex = { version = "0.4.0-b.2", features = ["rustls", "openssl"] }
5 changes: 5 additions & 0 deletions examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ async fn publish_v5(publish: v5::Publish) -> Result<v5::PublishAck, ServerError>

#[ntex::main]
async fn main() -> std::io::Result<()> {
println!("{}", std::mem::size_of::<v5::codec::Publish>());
println!("{}", std::mem::size_of::<v5::codec::PublishProperties>());
println!("{}", std::mem::size_of::<v5::codec::Packet>());
println!("{}", std::mem::size_of::<v5::Handshake<ntex::rt::net::TcpStream>>());
println!("{}", std::mem::size_of::<v5::error::MqttError<()>>());
std::env::set_var("RUST_LOG", "ntex=trace,ntex_mqtt=trace,basic=trace");
env_logger::init();

Expand Down
2 changes: 1 addition & 1 deletion src/v3/client/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ where
let state = State::new();
let codec = codec::Codec::new().max_size(max_packet_size);

state.send(&mut io, &codec, codec::Packet::Connect(pkt)).await?;
state.send(&mut io, &codec, pkt.into()).await?;

let packet = state
.next(&mut io, &codec)
Expand Down
13 changes: 7 additions & 6 deletions src/v3/codec/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,15 @@ fn decode_connect_packet(src: &mut Bytes) -> Result<Packet, DecodeError> {
};
let password =
if flags.contains(ConnectFlags::PASSWORD) { Some(Bytes::decode(src)?) } else { None };
Ok(Packet::Connect(Connect {
Ok(Connect {
clean_session: flags.contains(ConnectFlags::CLEAN_START),
keep_alive,
client_id,
last_will,
username,
password,
}))
}
.into())
}

fn decode_connect_ack_packet(src: &mut Bytes) -> Result<Packet, DecodeError> {
Expand Down Expand Up @@ -187,21 +188,21 @@ mod tests {
decode_connect_packet(&mut Bytes::from_static(
b"\x00\x04MQTT\x04\xC0\x00\x3C\x00\x0512345\x00\x04user\x00\x04pass"
)),
Ok(Packet::Connect(Connect {
Ok(Packet::Connect(Box::new(Connect {
clean_session: false,
keep_alive: 60,
client_id: ByteString::try_from(Bytes::from_static(b"12345")).unwrap(),
last_will: None,
username: Some(ByteString::try_from(Bytes::from_static(b"user")).unwrap()),
password: Some(Bytes::from(&b"pass"[..])),
}))
})))
);

assert_eq!(
decode_connect_packet(&mut Bytes::from_static(
b"\x00\x04MQTT\x04\x14\x00\x3C\x00\x0512345\x00\x05topic\x00\x07message"
)),
Ok(Packet::Connect(Connect {
Ok(Packet::Connect(Box::new(Connect {
clean_session: false,
keep_alive: 60,
client_id: ByteString::try_from(Bytes::from_static(b"12345")).unwrap(),
Expand All @@ -213,7 +214,7 @@ mod tests {
}),
username: None,
password: None,
}))
})))
);

assert_eq!(
Expand Down
10 changes: 5 additions & 5 deletions src/v3/codec/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::packet::*;
pub(crate) fn get_encoded_size(packet: &Packet) -> usize {
match *packet {
Packet::Connect ( ref connect ) => {
let Connect {ref last_will, ref client_id, ref username, ref password, ..} = *connect;
let Connect {ref last_will, ref client_id, ref username, ref password, ..} = **connect;

// Protocol Name + Protocol Level + Connect Flags + Keep Alive
let mut n = 2 + 4 + 1 + 1 + 2;
Expand Down Expand Up @@ -264,20 +264,20 @@ mod tests {
#[test]
fn test_encode_connect_packets() {
assert_encode_packet(
&Packet::Connect(Connect {
&Packet::Connect(Box::new(Connect {
clean_session: false,
keep_alive: 60,
client_id: ByteString::from_static("12345"),
last_will: None,
username: Some(ByteString::from_static("user")),
password: Some(Bytes::from_static(b"pass")),
}),
})),
&b"\x10\x1D\x00\x04MQTT\x04\xC0\x00\x3C\x00\
\x0512345\x00\x04user\x00\x04pass"[..],
);

assert_encode_packet(
&Packet::Connect(Connect {
&Packet::Connect(Box::new(Connect {
clean_session: false,
keep_alive: 60,
client_id: ByteString::from_static("12345"),
Expand All @@ -289,7 +289,7 @@ mod tests {
}),
username: None,
password: None,
}),
})),
&b"\x10\x21\x00\x04MQTT\x04\x14\x00\x3C\x00\
\x0512345\x00\x05topic\x00\x07message"[..],
);
Expand Down
4 changes: 2 additions & 2 deletions src/v3/codec/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pub enum SubscribeReturnCode {
/// MQTT Control Packets
pub enum Packet {
/// Client request to connect to Server
Connect(Connect),
Connect(Box<Connect>),

/// Connect acknowledgment
ConnectAck {
Expand Down Expand Up @@ -194,7 +194,7 @@ pub enum Packet {

impl From<Connect> for Packet {
fn from(val: Connect) -> Packet {
Packet::Connect(val)
Packet::Connect(Box::new(val))
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/v3/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use super::sink::MqttSink;
/// Connect message
pub struct Handshake<Io> {
io: Io,
pkt: mqtt::Connect,
pkt: Box<mqtt::Connect>,
shared: Rc<MqttShared>,
}

impl<Io> Handshake<Io> {
pub(crate) fn new(pkt: mqtt::Connect, io: Io, shared: Rc<MqttShared>) -> Self {
pub(crate) fn new(pkt: Box<mqtt::Connect>, io: Io, shared: Rc<MqttShared>) -> Self {
Self { io, pkt, shared }
}

Expand Down
4 changes: 2 additions & 2 deletions src/v5/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct Client<Io> {
keepalive: u16,
disconnect_timeout: u16,
max_receive: usize,
pkt: codec::ConnectAck,
pkt: Box<codec::ConnectAck>,
}

impl<T> Client<T>
Expand All @@ -34,7 +34,7 @@ where
pub(super) fn new(
io: T,
shared: Rc<MqttShared>,
pkt: codec::ConnectAck,
pkt: Box<codec::ConnectAck>,
max_receive: u16,
keepalive: u16,
disconnect_timeout: u16,
Expand Down
2 changes: 1 addition & 1 deletion src/v5/client/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ where
let state = State::new();
let codec = codec::Codec::new().max_inbound_size(max_packet_size);

state.send(&mut io, &codec, codec::Packet::Connect(pkt)).await?;
state.send(&mut io, &codec, codec::Packet::Connect(Box::new(pkt))).await?;

let packet = state
.next(&mut io, &codec)
Expand Down
8 changes: 4 additions & 4 deletions src/v5/codec/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ pub(super) fn decode_packet(mut src: Bytes, first_byte: u8) -> Result<Packet, De
packet_type::SUBACK => Ok(Packet::SubscribeAck(SubscribeAck::decode(&mut src)?)),
packet_type::UNSUBSCRIBE => Ok(Packet::Unsubscribe(Unsubscribe::decode(&mut src)?)),
packet_type::UNSUBACK => Ok(Packet::UnsubscribeAck(UnsubscribeAck::decode(&mut src)?)),
packet_type::CONNECT => Ok(Packet::Connect(Connect::decode(&mut src)?)),
packet_type::CONNACK => Ok(Packet::ConnectAck(ConnectAck::decode(&mut src)?)),
packet_type::CONNECT => Ok(Packet::Connect(Box::new(Connect::decode(&mut src)?))),
packet_type::CONNACK => Ok(Packet::ConnectAck(Box::new(ConnectAck::decode(&mut src)?))),
packet_type::DISCONNECT => Ok(Packet::Disconnect(Disconnect::decode(&mut src)?)),
packet_type::AUTH => Ok(Packet::Auth(Auth::decode(&mut src)?)),
packet_type::PUBREC => Ok(Packet::PublishReceived(PublishAck::decode(&mut src)?)),
Expand Down Expand Up @@ -165,11 +165,11 @@ mod tests {

assert_decode_packet(
b"\x20\x03\x01\x86\x00",
Packet::ConnectAck(ConnectAck {
Packet::ConnectAck(Box::new(ConnectAck {
session_present: true,
reason_code: ConnectAckReason::BadUserNameOrPassword,
..ConnectAck::default()
}),
})),
);

assert_decode_packet([0b1110_0000, 0], Packet::Disconnect(Disconnect::default()));
Expand Down
8 changes: 4 additions & 4 deletions src/v5/codec/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ mod tests {
#[test]
fn test_encode_connect_packets() {
assert_encode_packet(
&Packet::Connect(Connect {
&Packet::Connect(Box::new(Connect {
clean_start: false,
keep_alive: 60,
client_id: ByteString::from_static("12345"),
Expand All @@ -333,13 +333,13 @@ mod tests {
topic_alias_max: 0,
user_properties: vec![],
max_packet_size: None,
}),
})),
&b"\x10\x1E\x00\x04MQTT\x05\xC0\x00\x3C\x00\x00\
\x0512345\x00\x04user\x00\x04pass"[..],
);

assert_encode_packet(
&Packet::Connect(Connect {
&Packet::Connect(Box::new(Connect {
clean_start: false,
keep_alive: 60,
client_id: ByteString::from_static("12345"),
Expand Down Expand Up @@ -367,7 +367,7 @@ mod tests {
topic_alias_max: 0,
user_properties: vec![],
max_packet_size: None,
}),
})),
&b"\x10\x23\x00\x04MQTT\x05\x14\x00\x3C\x00\x00\
\x0512345\x00\x00\x05topic\x00\x07message"[..],
);
Expand Down
4 changes: 2 additions & 2 deletions src/v5/codec/packet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ pub use subscribe::*;
/// MQTT Control Packets
pub enum Packet {
/// Client request to connect to Server
Connect(Connect),
Connect(Box<Connect>),
/// Connect acknowledgment
ConnectAck(ConnectAck),
ConnectAck(Box<ConnectAck>),
/// Publish message
Publish(Publish),
/// Publish acknowledgment
Expand Down
2 changes: 1 addition & 1 deletion src/v5/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub use crate::v5::codec;
pub enum ClientError {
/// Connect negotiation failed
#[display(fmt = "Connect ack failed: {:?}", _0)]
Ack(codec::ConnectAck),
Ack(Box<codec::ConnectAck>),
/// Protocol error
#[display(fmt = "Protocol error: {:?}", _0)]
Protocol(ProtocolError),
Expand Down
4 changes: 2 additions & 2 deletions src/v5/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use super::{codec, shared::MqttShared, sink::MqttSink};
/// Handshake message
pub struct Handshake<Io> {
io: Io,
pkt: codec::Connect,
pkt: Box<codec::Connect>,
pub(super) shared: Rc<MqttShared>,
pub(super) max_size: u32,
pub(super) max_receive: u16,
Expand All @@ -14,7 +14,7 @@ pub struct Handshake<Io> {

impl<Io> Handshake<Io> {
pub(crate) fn new(
pkt: codec::Connect,
pkt: Box<codec::Connect>,
io: Io,
shared: Rc<MqttShared>,
max_size: u32,
Expand Down
18 changes: 14 additions & 4 deletions src/v5/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,11 @@ where

state.set_buffer_params(ack.read_hw, ack.write_hw, ack.lw);
state
.send(&mut ack.io, &shared.codec, mqtt::Packet::ConnectAck(ack.packet))
.send(
&mut ack.io,
&shared.codec,
mqtt::Packet::ConnectAck(Box::new(ack.packet)),
)
.await?;

Ok((
Expand All @@ -521,7 +525,10 @@ where
.shared
.state
.write()
.encode(mqtt::Packet::ConnectAck(ack.packet), &ack.shared.codec)
.encode(
mqtt::Packet::ConnectAck(Box::new(ack.packet)),
&ack.shared.codec,
)
.is_ok()
{
WriteTask::shutdown(
Expand Down Expand Up @@ -743,7 +750,7 @@ where
.send(
&mut ack.io,
&shared.codec,
mqtt::Packet::ConnectAck(ack.packet),
mqtt::Packet::ConnectAck(Box::new(ack.packet)),
)
.await?;

Expand All @@ -770,7 +777,10 @@ where
.shared
.state
.write()
.encode(mqtt::Packet::ConnectAck(ack.packet), &ack.shared.codec)
.encode(
mqtt::Packet::ConnectAck(Box::new(ack.packet)),
&ack.shared.codec,
)
.is_ok()
{
WriteTask::shutdown(
Expand Down
9 changes: 3 additions & 6 deletions tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async fn test_ping() -> std::io::Result<()> {
let io = srv.connect().await.unwrap();
let mut framed = Framed::new(io, codec::Codec::default());
framed
.send(codec::Packet::Connect(codec::Connect::default().client_id("user")))
.send(codec::Packet::Connect(codec::Connect::default().client_id("user").into()))
.await
.unwrap();
framed.next().await.unwrap().unwrap();
Expand Down Expand Up @@ -157,10 +157,7 @@ async fn test_ack_order() -> std::io::Result<()> {

let io = srv.connect().await.unwrap();
let mut framed = Framed::new(io, codec::Codec::default());
framed
.send(codec::Packet::Connect(codec::Connect::default().client_id("user")))
.await
.unwrap();
framed.send(codec::Connect::default().client_id("user").into()).await.unwrap();
let _ = framed.next().await.unwrap().unwrap();

framed
Expand Down Expand Up @@ -306,7 +303,7 @@ async fn test_handle_incoming() -> std::io::Result<()> {

let io = srv.connect().await.unwrap();
let mut framed = Framed::new(io, codec::Codec::default());
framed.write(codec::Packet::Connect(codec::Connect::default().client_id("user"))).unwrap();
framed.write(codec::Connect::default().client_id("user").into()).unwrap();
framed
.write(
codec::Publish {
Expand Down
Loading

0 comments on commit 630cba3

Please sign in to comment.