Skip to content

Commit

Permalink
Expose some control plane type constructors. (#69)
Browse files Browse the repository at this point in the history
* Expose some control plane types.

* Don't advertise ctors in publis docs.
  • Loading branch information
vadim-kovalyov authored Oct 28, 2021
1 parent 965c09d commit 6c9bda4
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 20 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.7.4] - 2021-10-26

* Expose some control plane type constructors

## [0.7.3] - 2021-10-20

* Do not poll service for readiness if it failed before
Expand Down
33 changes: 28 additions & 5 deletions src/v3/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,31 @@ pub(crate) enum ControlResultKind {
}

impl<E> ControlMessage<E> {
pub(crate) fn ping() -> Self {
/// Create a new PING `ControlMessage`.
#[doc(hidden)]
pub fn ping() -> Self {
ControlMessage::Ping(Ping)
}

pub(crate) fn pkt_disconnect() -> Self {
/// Create a new `ControlMessage` from SUBSCRIBE packet.
#[doc(hidden)]
pub fn subscribe(pkt: Subscribe) -> Self {
ControlMessage::Subscribe(pkt)
}

/// Create a new `ControlMessage` from UNSUBSCRIBE packet.
#[doc(hidden)]
pub fn unsubscribe(pkt: Unsubscribe) -> Self {
ControlMessage::Unsubscribe(pkt)
}

/// Create a new `ControlMessage` from DISCONNECT packet.
#[doc(hidden)]
pub fn dis() -> Self {
ControlMessage::Disconnect(Disconnect)
}

pub(crate) fn closed(is_error: bool) -> Self {
pub(super) fn closed(is_error: bool) -> Self {
ControlMessage::Closed(Closed::new(is_error))
}

Expand All @@ -59,6 +75,7 @@ impl<E> ControlMessage<E> {
ControlMessage::ProtocolError(ProtocolError::new(err))
}

/// Disconnects the client by sending DISCONNECT packet.
pub fn disconnect(&self) -> ControlResult {
ControlResult { result: ControlResultKind::Disconnect }
}
Expand Down Expand Up @@ -158,7 +175,10 @@ pub(crate) struct SubscribeResult {
}

impl Subscribe {
pub(crate) fn new(packet_id: NonZeroU16, topics: Vec<(ByteString, QoS)>) -> Self {
/// Create a new `Subscribe` control message from packet id and
/// a list of topics.
#[doc(hidden)]
pub fn new(packet_id: NonZeroU16, topics: Vec<(ByteString, QoS)>) -> Self {
let mut codes = Vec::with_capacity(topics.len());
(0..topics.len()).for_each(|_| codes.push(codec::SubscribeReturnCode::Failure));

Expand Down Expand Up @@ -281,7 +301,10 @@ pub(crate) struct UnsubscribeResult {
}

impl Unsubscribe {
pub(crate) fn new(packet_id: NonZeroU16, topics: Vec<ByteString>) -> Self {
/// Create a new `Unsubscribe` control message from packet id and
/// a list of topics.
#[doc(hidden)]
pub fn new(packet_id: NonZeroU16, topics: Vec<ByteString>) -> Self {
Self { packet_id, topics }
}

Expand Down
6 changes: 3 additions & 3 deletions src/v3/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ where
}

Either::Right(Either::Right(ControlResponse::new(
ControlMessage::Subscribe(Subscribe::new(packet_id, topic_filters)),
ControlMessage::subscribe(Subscribe::new(packet_id, topic_filters)),
&self.inner,
)))
}
Expand All @@ -203,12 +203,12 @@ where
}

Either::Right(Either::Right(ControlResponse::new(
ControlMessage::Unsubscribe(Unsubscribe::new(packet_id, topic_filters)),
ControlMessage::unsubscribe(Unsubscribe::new(packet_id, topic_filters)),
&self.inner,
)))
}
DispatchItem::Item(codec::Packet::Disconnect) => Either::Right(Either::Right(
ControlResponse::new(ControlMessage::pkt_disconnect(), &self.inner),
ControlResponse::new(ControlMessage::dis(), &self.inner),
)),
DispatchItem::Item(_) => Either::Right(Either::Left(Ready::Ok(None))),
DispatchItem::EncoderError(err) => {
Expand Down
5 changes: 4 additions & 1 deletion src/v3/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ pub struct Publish {
pub struct PublishAck;

impl Publish {
pub(crate) fn new(publish: codec::Publish) -> Self {
/// Create a new `Publish` message from a PUBLISH
/// packet
#[doc(hidden)]
pub fn new(publish: codec::Publish) -> Self {
Self { topic: Path::new(publish.topic.clone()), publish }
}

Expand Down
48 changes: 41 additions & 7 deletions src/v5/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,21 @@ use crate::error;
/// Control plain messages
#[derive(Debug)]
pub enum ControlMessage<E> {
/// Auth packet from a client
Auth(Auth),
/// Ping packet from a client
Ping(Ping),
/// Disconnect packet from a client
Disconnect(Disconnect),
/// Subscribe packet from a client
Subscribe(Subscribe),
/// Unsubscribe packet from a client
Unsubscribe(Unsubscribe),
/// Underlying transport connection closed
Closed(Closed),
/// Unhandled application level error from handshake, publish and control services
Error(Error<E>),
/// Protocol level error
ProtocolError(ProtocolError),
}

Expand All @@ -26,15 +34,33 @@ pub struct ControlResult {
}

impl<E> ControlMessage<E> {
pub(super) fn auth(pkt: codec::Auth) -> Self {
/// Create a new `ControlMessage` from AUTH packet.
#[doc(hidden)]
pub fn auth(pkt: codec::Auth) -> Self {
ControlMessage::Auth(Auth(pkt))
}

pub(super) fn ping() -> Self {
/// Create a new `ControlMessage` from SUBSCRIBE packet.
#[doc(hidden)]
pub fn subscribe(pkt: codec::Subscribe) -> Self {
ControlMessage::Subscribe(Subscribe::new(pkt))
}

/// Create a new `ControlMessage` from UNSUBSCRIBE packet.
#[doc(hidden)]
pub fn unsubscribe(pkt: codec::Unsubscribe) -> Self {
ControlMessage::Unsubscribe(Unsubscribe::new(pkt))
}

/// Create a new PING `ControlMessage`.
#[doc(hidden)]
pub fn ping() -> Self {
ControlMessage::Ping(Ping)
}

pub(super) fn dis(pkt: codec::Disconnect) -> Self {
/// Create a new `ControlMessage` from DISCONNECT packet.
#[doc(hidden)]
pub fn dis(pkt: codec::Disconnect) -> Self {
ControlMessage::Disconnect(Disconnect(pkt))
}

Expand All @@ -50,6 +76,8 @@ impl<E> ControlMessage<E> {
ControlMessage::ProtocolError(ProtocolError::new(err))
}

/// Disconnects the client by sending DISCONNECT packet
/// with `NormalDisconnection` reason code.
pub fn disconnect(&self) -> ControlResult {
let pkt = codec::Disconnect {
reason_code: codec::DisconnectReasonCode::NormalDisconnection,
Expand All @@ -61,6 +89,8 @@ impl<E> ControlMessage<E> {
ControlResult { packet: Some(codec::Packet::Disconnect(pkt)), disconnect: true }
}

/// Disconnects the client by sending DISCONNECT packet
/// with provided reason code.
pub fn disconnect_with(&self, pkt: codec::Disconnect) -> ControlResult {
ControlResult { packet: Some(codec::Packet::Disconnect(pkt)), disconnect: true }
}
Expand Down Expand Up @@ -112,7 +142,9 @@ pub struct Subscribe {
}

impl Subscribe {
pub(crate) fn create<E>(packet: codec::Subscribe) -> ControlMessage<E> {
/// Create a new `Subscribe` control message from a Subscribe
/// packet
pub fn new(packet: codec::Subscribe) -> Self {
let mut status = Vec::with_capacity(packet.topic_filters.len());
(0..packet.topic_filters.len())
.for_each(|_| status.push(codec::SubscribeAckReason::UnspecifiedError));
Expand All @@ -124,7 +156,7 @@ impl Subscribe {
reason_string: None,
};

ControlMessage::Subscribe(Self { packet, result })
Self { packet, result }
}

#[inline]
Expand Down Expand Up @@ -261,7 +293,9 @@ pub struct Unsubscribe {
}

impl Unsubscribe {
pub(crate) fn create<E>(packet: codec::Unsubscribe) -> ControlMessage<E> {
/// Create a new `Unsubscribe` control message from an Unsubscribe
/// packet
pub fn new(packet: codec::Unsubscribe) -> Self {
let mut status = Vec::with_capacity(packet.topic_filters.len());
(0..packet.topic_filters.len())
.for_each(|_| status.push(codec::UnsubscribeAckReason::Success));
Expand All @@ -273,7 +307,7 @@ impl Unsubscribe {
reason_string: None,
};

ControlMessage::Unsubscribe(Self { packet, result })
Self { packet, result }
}

/// Unsubscribe packet user properties
Expand Down
4 changes: 2 additions & 2 deletions src/v5/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ where
}
let id = pkt.packet_id;
Either::Right(Either::Right(
ControlResponse::new(control::Subscribe::create(pkt), &self.inner)
ControlResponse::new(ControlMessage::subscribe(pkt), &self.inner)
.packet_id(id),
))
}
Expand All @@ -297,7 +297,7 @@ where
}
let id = pkt.packet_id;
Either::Right(Either::Right(
ControlResponse::new(control::Unsubscribe::create(pkt), &self.inner)
ControlResponse::new(ControlMessage::unsubscribe(pkt), &self.inner)
.packet_id(id),
))
}
Expand Down
7 changes: 5 additions & 2 deletions src/v5/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ pub struct Publish {
}

impl Publish {
pub(crate) fn new(publish: codec::Publish) -> Self {
/// Create a new `Publish` message from a PUBLISH
/// packet
#[doc(hidden)]
pub fn new(publish: codec::Publish) -> Self {
Self { topic: Path::new(publish.topic.clone()), publish }
}

Expand Down Expand Up @@ -112,7 +115,7 @@ pub struct PublishAck {
}

impl PublishAck {
/// Create new `PublishAck` instance
/// Create new `PublishAck` instance from a reason code.
pub fn new(code: codec::PublishAckReason) -> Self {
PublishAck {
reason_code: code,
Expand Down

0 comments on commit 6c9bda4

Please sign in to comment.