Skip to content

Commit

Permalink
Fix retained in internal publish
Browse files Browse the repository at this point in the history
  • Loading branch information
davidepianca98 committed Mar 11, 2020
1 parent a6ed165 commit 3b94159
Showing 1 changed file with 52 additions and 33 deletions.
85 changes: 52 additions & 33 deletions src/commonMain/kotlin/mqtt/broker/Broker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class Broker(
it.value.hasSharedSubscriptionMatching(shareName, topicName)?.timestampShareSent ?: Long.MAX_VALUE
}?.value
session?.hasSharedSubscriptionMatching(shareName, topicName)?.let { subscription ->
publish(publisherClientId, retain, topicName, qos, dup, properties, payload, session, subscription)
publishNormal(publisherClientId, retain, topicName, qos, dup, properties, payload, session, subscription)
subscription.timestampShareSent = currentTimeMillis()
}
}
Expand All @@ -79,6 +79,36 @@ class Broker(
session.will = null
}

private fun publishNormal(
publisherClientId: String,
retain: Boolean,
topicName: String,
qos: Qos,
dup: Boolean,
properties: MQTTProperties,
payload: UByteArray?,
session: Session,
subscription: Subscription
) {
if (subscription.options.noLocal && publisherClientId == session.clientId) {
return
}

subscription.subscriptionIdentifier?.let {
properties.subscriptionIdentifier.clear()
properties.subscriptionIdentifier.add(it)
}

session.clientConnection?.publish(
retain,
topicName,
Qos.min(subscription.options.qos, qos),
dup,
properties,
payload
)
}

internal fun publish(
publisherClientId: String,
retain: Boolean,
Expand Down Expand Up @@ -114,7 +144,7 @@ class Broker(
sharedDone += subscription.shareName
}
} else {
publish(
publishNormal(
publisherClientId,
retain,
topicName,
Expand All @@ -130,44 +160,33 @@ class Broker(
}
}

private fun publish(
publisherClientId: String,
retain: Boolean,
topicName: String,
qos: Qos,
dup: Boolean,
properties: MQTTProperties,
payload: UByteArray?,
session: Session,
subscription: Subscription
) {
if (subscription.options.noLocal && publisherClientId == session.clientId) {
return
}

subscription.subscriptionIdentifier?.let {
properties.subscriptionIdentifier.clear()
properties.subscriptionIdentifier.add(it)
}

session.clientConnection?.publish(
retain,
topicName,
Qos.min(subscription.options.qos, qos),
dup,
properties,
payload
)
}

fun publish(
retain: Boolean,
topicName: String,
qos: Qos,
properties: MQTTProperties,
payload: UByteArray?
) {
): Boolean {
if (maximumQos != null && qos > maximumQos) {
return false
}
if (retain) {
if (!retainedAvailable) {
return false
}
val packet = MQTTPublish(
retain,
qos,
false,
topicName,
null,
properties,
payload
)
setRetained(topicName, packet, "")
}
publish("", retain, topicName, qos, false, properties, payload)
return true
}

internal fun setRetained(topicName: String, message: MQTTPublish, clientId: String) {
Expand Down

0 comments on commit 3b94159

Please sign in to comment.