Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition in server when accepting a connection #94

Closed
sophokles73 opened this issue May 24, 2018 · 50 comments
Closed

Race condition in server when accepting a connection #94

sophokles73 opened this issue May 24, 2018 · 50 comments
Assignees

Comments

@sophokles73
Copy link
Contributor

I am connecting a client using vertx-mqtt to a server implemented on top of vertx-mqtt. The client is implemented to connect to the server, wait for the server's CONNACK and then start publishing some messages.

Every once in a while, the server runs into the following problem when the client sends its first message:

MQTT09:55:42.051 [vert.x-eventloop-thread-0] ERROR io.vertx.core.impl.ContextImpl - Unhandled exception
MQTTjava.lang.IllegalStateException: Received an MQTT packet from a not connected client (CONNECT not sent yet)
MQTT	at io.vertx.mqtt.impl.MqttServerConnection.checkConnected(MqttServerConnection.java:408)
MQTT	at io.vertx.mqtt.impl.MqttServerConnection.handlePublish(MqttServerConnection.java:324)
MQTT	at io.vertx.mqtt.impl.MqttServerConnection.handleMessage(MqttServerConnection.java:136)
MQTT	at io.vertx.mqtt.impl.MqttServerImpl.lambda$null$3(MqttServerImpl.java:93)
MQTT	at io.vertx.core.net.impl.NetSocketImpl.handleMessageReceived(NetSocketImpl.java:351)
MQTT	at io.vertx.core.net.impl.NetServerImpl$2.handleMessage(NetServerImpl.java:446)
MQTT	at io.vertx.core.net.impl.NetServerImpl$2.handleMessage(NetServerImpl.java:443)
MQTT	at io.vertx.core.net.impl.VertxHandler.lambda$channelRead$1(VertxHandler.java:146)

I suspect a race condition in MqttEndpointImpl between the connect handler and the publish handler. The former sends the CONNACK packet to the client before it sets its connected flag to true. The latter then checks if the client is connected before processing the published message. What seems to happen is that the server sends the CONNACK packet and is then interrupted before setting the connected flag to true. The client then sends its first message and the connected check in the publish handler fails ...

my understanding is that access to the connected flag should either be synchronized or the connected flag should be declared volatile and the connect handler should set it to true before sending the CONACK packet ...

@vietj
Copy link
Contributor

vietj commented May 24, 2018

@sophokles73 can you provide a patch and possibly a test ?

@ppatierno
Copy link
Member

@vietj discussing with @sophokles73 I'll take a look into it but if he already have a patch + tests it could be useful :-)

@vietj
Copy link
Contributor

vietj commented May 24, 2018

it would be great to have a patch soon that we can backport to 3.5.2

@sophokles73
Copy link
Contributor Author

providing a patch is easy, writing a test case probably isn't ...

@Sammers21
Copy link
Contributor

You can try to connect and then instantly publish for a big amount of times, so I think the right test should be like that, but of course, it will not guarantee that code is not broken.

@sophokles73
Copy link
Contributor Author

the more I think about it, the less I believe it is because of lacking synchronization or field visibility. The MQTT server's handlers all run on the same event loop so the problem is probably more related to sending the CONACK packet to the client via Netty (which probably sends it on a different thread than the event loop thread). In this case, simply setting the connected flag before sending the packet will probably already do the trick. WDYT?

@vietj
Copy link
Contributor

vietj commented May 24, 2018

@sophokles73 I think the best would be a reproducer so we can check if that's a race condition or something else

@sophokles73
Copy link
Contributor Author

@vietj By reproducer you mean a piece of code that demonstrates the problem (reliably)? If so, that is what I meant by

writing a test case probably isn't

How can I interrupt/pause the execution of MqttEndpointImpl.conack() after it has sent the CONACK paket and before it sets the connected flag to true?

@vietj
Copy link
Contributor

vietj commented May 25, 2018

yes and that exhibit the failure

@sophokles73
Copy link
Contributor Author

As indicated above, I have no clue how I could do that based on the existing code. Do you have any idea?

@sophokles73
Copy link
Contributor Author

I am closing this issue because I am not able to create a reproducer and I am also no longer convinced that there exists a race condition (at least in theory, vert.x should prevent the situation described above from happening).

@vietj
Copy link
Contributor

vietj commented Jun 7, 2018

thanks @sophokles73 don't hesitate to reopen if you have info

@sophokles73
Copy link
Contributor Author

I have run into this issue again and I now also have the log output which includes the names of threads:

16:24:26.351 [main] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS0IT - running testUploadMessagesUsingShortTopicNames
Jun 11, 2018 4:24:26 PM io.vertx.mqtt.impl.MqttClientImpl
INFO: Connection with localhost:32931 established successfully
MQTT14:24:26.449 [vert.x-eventloop-thread-3] INFO  o.e.h.c.i.AbstractRequestResponseClient - enabling caching of responses from registration/27b2ce55-7e45-4327-9e3b-5e7197f86fbf
MQTT14:24:26.594 [vert.x-eventloop-thread-2] INFO  o.e.h.s.m.LoggingConnectionEventProducer -    Connected - ID: dd3ca1bb-a6d1-4385-9712-b716f5f0d480, Protocol Adapter: hono-mqtt, Device: device [device-id: eb0dbf7a-cc4b-4469-8550-1e4a5a1d49f1, tenant-id: 27b2ce55-7e45-4327-9e3b-5e7197f86fbf], Data: null
MQTT14:24:26.611 [vert.x-eventloop-thread-0] ERROR io.vertx.core.impl.ContextImpl - Unhandled exception
MQTTjava.lang.IllegalStateException: Received an MQTT packet from a not connected client (CONNECT not sent yet)
MQTT	at io.vertx.mqtt.impl.MqttServerConnection.checkConnected(MqttServerConnection.java:408)
MQTT	at io.vertx.mqtt.impl.MqttServerConnection.handlePublish(MqttServerConnection.java:324)
MQTT	at io.vertx.mqtt.impl.MqttServerConnection.handleMessage(MqttServerConnection.java:136)
MQTT	at io.vertx.mqtt.impl.MqttServerImpl.lambda$null$3(MqttServerImpl.java:93)
MQTT	at io.vertx.core.net.impl.NetSocketImpl.handleMessageReceived(NetSocketImpl.java:351)
MQTT	at io.vertx.core.net.impl.NetServerImpl$2.handleMessage(NetServerImpl.java:446)
MQTT	at io.vertx.core.net.impl.NetServerImpl$2.handleMessage(NetServerImpl.java:443)
MQTT	at io.vertx.core.net.impl.VertxHandler.lambda$channelRead$1(VertxHandler.java:146)
MQTT	at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:337)
MQTT	at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:195)
MQTT	at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:144)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
MQTT	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
MQTT	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
MQTT	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
MQTT	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
MQTT	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
MQTT	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
MQTT	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:141)
MQTT	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
MQTT	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
MQTT	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
MQTT	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
MQTT	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
MQTT	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
MQTT	at java.lang.Thread.run(Thread.java:748)
MQTT14:24:26.612 [vert.x-eventloop-thread-0] INFO  o.e.h.s.m.LoggingConnectionEventProducer - Disconnected - ID: dd3ca1bb-a6d1-4385-9712-b716f5f0d480, Protocol Adapter: hono-mqtt, Device: device [device-id: eb0dbf7a-cc4b-4469-8550-1e4a5a1d49f1, tenant-id: 27b2ce55-7e45-4327-9e3b-5e7197f86fbf], Data: null
16:24:26.657 [vert.x-eventloop-thread-0] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS0IT - messages sent: 40
16:24:26.679 [vert.x-eventloop-thread-0] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS0IT - messages sent: 80
16:24:26.688 [vert.x-eventloop-thread-0] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS0IT - messages sent: 120
16:24:26.701 [vert.x-eventloop-thread-0] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS0IT - messages sent: 160
16:24:26.710 [vert.x-eventloop-thread-0] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS0IT - messages sent: 200
16:24:46.711 [main] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS0IT - sent 200 and received 0 messages in -1528727066599 milliseconds
16:24:46.712 [main] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS0IT - running on CI test environment, allowing for 100 percent of messages to be lost ...
16:24:46.715 [main] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS0IT - connection to MQTT adapter closed

it is a log of an interaction of a client (a unit test case) interacting with a server (running in a Docker container). The server's log statements are prefixed with MQTT, the rest is from the client (and the test case). The most interesting lines are

MQTT14:24:26.449 [vert.x-eventloop-thread-3] INFO  o.e.h.c.i.AbstractRequestResponseClient - enabling caching of responses from registration/27b2ce55-7e45-4327-9e3b-5e7197f86fbf
MQTT14:24:26.594 [vert.x-eventloop-thread-2] INFO  o.e.h.s.m.LoggingConnectionEventProducer -    Connected - ID: dd3ca1bb-a6d1-4385-9712-b716f5f0d480, Protocol Adapter: hono-mqtt, Device: device [device-id: eb0dbf7a-cc4b-4469-8550-1e4a5a1d49f1, tenant-id: 27b2ce55-7e45-4327-9e3b-5e7197f86fbf], Data: null
MQTT14:24:26.611 [vert.x-eventloop-thread-0] ERROR io.vertx.core.impl.ContextImpl - Unhandled exception

because they seem to support my original assumption that the publish handler in the MQTT server is not running on the same thread as the connection handler. The LoggingConnectionEventProducer is producing its statement directly before the MqttEndpointImpl. accept(boolean) method is invoked and seems to be running on event loop thread 2. The publish handler then seems to be invoked running on event loop thread 0 and FMPOV this could be the reason for the problem, i.e. we might indeed have a race condition between these two threads for access to the connected field.

The MQTT server is deployed as a Verticle, thus I believe that there is not much potential for starting the server wrongly, but I might be mistaken ...

@sophokles73 sophokles73 reopened this Jun 11, 2018
@vietj
Copy link
Contributor

vietj commented Jun 11, 2018

can you provide a reproducer project ?

@vietj
Copy link
Contributor

vietj commented Jun 11, 2018

are you running this in a verticle ? can you show the code that starts the server ?

@sophokles73
Copy link
Contributor Author

can you provide a reproducer project ?

This is part of the integration test suite we are running for the Eclipse Hono project. The error does not occur reliably but instead it happens only once in a while (that's why I suspect a race condition).

@vietj
Copy link
Contributor

vietj commented Jun 11, 2018

how can I reproduce it ?

@sophokles73
Copy link
Contributor Author

are you running this in a verticle ?

The server is running as a verticle. The client is run as a junit test using the VertxUnitRunner. The server is started as a Docker container before the test is executed.

can you show the code that starts the server ?

The * bindInsecureMqttServer* method is invoked as part of the Verticle's start() method ...

    private Future<Void> bindInsecureMqttServer() {

        if (isInsecurePortEnabled()) {
            final MqttServerOptions options = new MqttServerOptions();
            options
                    .setHost(getConfig().getInsecurePortBindAddress())
                    .setPort(determineInsecurePort())
                    .setMaxMessageSize(getConfig().getMaxPayloadSize());

            return bindMqttServer(options, insecureServer).map(server -> {
                insecureServer = server;
                return (Void) null;
            }).recover(t -> {
                return Future.failedFuture(t);
            });
        } else {
            return Future.succeededFuture();
        }
    }

    private Future<MqttServer> bindMqttServer(final MqttServerOptions options, final MqttServer mqttServer) {

        final Future<MqttServer> result = Future.future();
        final MqttServer createdMqttServer = mqttServer == null ? MqttServer.create(this.vertx, options) : mqttServer;

        createdMqttServer
                .endpointHandler(this::handleEndpointConnection)
                .listen(done -> {

                    if (done.succeeded()) {
                        LOG.info("MQTT server running on {}:{}", getConfig().getBindAddress(),
                                createdMqttServer.actualPort());
                        result.complete(createdMqttServer);
                    } else {
                        LOG.error("error while starting up MQTT server", done.cause());
                        result.fail(done.cause());
                    }
                });
        return result;
    }

@sophokles73
Copy link
Contributor Author

how can I reproduce it ?

You can check out the Eclipse Hono project and run the integration tests. However, there is no way to reliably reproduce the issue, i.e. you may or may not run into the problem ...

@vietj
Copy link
Contributor

vietj commented Jun 11, 2018

can you give instruction here about the git repo to clone and the command to execute ?

@sophokles73
Copy link
Contributor Author

Sure, the repo is https://github.com/eclipse/hono
Check out master and follow the Getting Started guide's instructions [1] to build locally.
Then run the relevant integration test by executing
mvn -Dit.test=TelemetryMqttQoS0IT verify -Prun-tests
from the tests folder

[1] https://www.eclipse.org/hono/getting-started/

@vietj
Copy link
Contributor

vietj commented Jun 12, 2018

I've been able to install the project but when I try to run the test I get:

[ERROR] DOCKER> Unable to pull 'eclipse/hono-service-auth:0.7-M1-SNAPSHOT' : manifest for eclipse/hono-service-auth:0.7-M1-SNAPSHOT not found (Not Found: 404) [manifest for eclipse/hono-service-auth:0.7-M1-SNAPSHOT not found (Not Found: 404)]

@sophokles73
Copy link
Contributor Author

have you been following the Getting Started guide and have built the Docker images on your local Docker daemon?

@vietj
Copy link
Contributor

vietj commented Jun 14, 2018

I'm able to run tests now, I will try until I see this in the console:

MQTT09:55:42.051 [vert.x-eventloop-thread-0] ERROR io.vertx.core.impl.ContextImpl - Unhandled exception
MQTTjava.lang.IllegalStateException: Received an MQTT packet from a not connected client (CONNECT not sent yet)
MQTT	at io.vertx.mqtt.impl.MqttServerConnection.checkConnected(MqttServerConnection.java:408)

does it make the test fail as well ?

@vietj vietj closed this as completed Jun 14, 2018
@vietj vietj reopened this Jun 14, 2018
@vietj
Copy link
Contributor

vietj commented Jun 14, 2018

I've seen this failure after 3 runs:

[ERROR] Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 6.156 s <<< FAILURE! - in org.eclipse.hono.tests.mqtt.TelemetryMqttQoS0IT
[ERROR] testUploadMessagesUsingShortTopicNames(org.eclipse.hono.tests.mqtt.TelemetryMqttQoS0IT)  Time elapsed: 2.325 s  <<< FAILURE!
java.lang.AssertionError: did not receive expected number of messages [expected: 180, received: 172]
	at org.eclipse.hono.tests.mqtt.TelemetryMqttQoS0IT.assertMessageReceivedRatio(TelemetryMqttQoS0IT.java:85)

[INFO] 
[INFO] Results:
[INFO] 
[ERROR] Failures: 
[ERROR]   TelemetryMqttQoS0IT>MqttTestBase.testUploadMessagesUsingShortTopicNames:219->MqttTestBase.doTestUploadMessages:282->assertMessageReceivedRatio:85 did not receive expected number of messages [expected: 180, received: 172]
[INFO] 
[ERROR] Tests run: 3, Failures: 1, Errors: 0, Skipped: 0

is it related to this case ?

@vietj
Copy link
Contributor

vietj commented Jun 14, 2018

you are saying that the MQTT prefixed log is from server however I can see:

MQTT14:24:26.449 [vert.x-eventloop-thread-3] INFO o.e.h.c.i.AbstractRequestResponseClient - enabling caching of responses from registration/27b2ce55-7e45-4327-9e3b-5e7197f86fbf
MQTT14:24:26.594 [vert.x-eventloop-thread-2] INFO o.e.h.s.m.LoggingConnectionEventProducer - Connected - ID: dd3ca1bb-a6d1-4385-9712-b716f5f0d480, Protocol Adapter: hono-mqtt, Device: device [device-id: eb0dbf7a-cc4b-4469-8550-1e4a5a1d49f1, tenant-id: 27b2ce55-7e45-4327-9e3b-5e7197f86fbf], Data: null

this is from client, can you clarify ?

@sophokles73
Copy link
Contributor Author

I've seen this failure after 3 runs:

[ERROR] Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 6.156 s <<< FAILURE! - in org.eclipse.hono.tests.mqtt.TelemetryMqttQoS0IT
[ERROR] testUploadMessagesUsingShortTopicNames(org.eclipse.hono.tests.mqtt.TelemetryMqttQoS0IT) Time elapsed: 2.325 s <<< FAILURE!
java.lang.AssertionError: did not receive expected number of messages [expected: 180, received: 172]
at org.eclipse.hono.tests.mqtt.TelemetryMqttQoS0IT.assertMessageReceivedRatio(TelemetryMqttQoS0IT.java:85)

[INFO]
[INFO] Results:
[INFO]
[ERROR] Failures:
[ERROR] TelemetryMqttQoS0IT>MqttTestBase.testUploadMessagesUsingShortTopicNames:219->MqttTestBase.doTestUploadMessages:282->assertMessageReceivedRatio:85 did not receive expected number of messages [expected: 180, received: 172]
[INFO]
[ERROR] Tests run: 3, Failures: 1, Errors: 0, Skipped: 0

is it related to this case ?

No, this happens when the client sends more messages (using QoS 0) during the test run than the server can handle at a given time. However, the test simply ignores this.

@sophokles73
Copy link
Contributor Author

you are saying that the MQTT prefixed log is from server however I can see:

MQTT14:24:26.449 [vert.x-eventloop-thread-3] INFO o.e.h.c.i.AbstractRequestResponseClient - enabling caching of responses from registration/27b2ce55-7e45-4327-9e3b-5e7197f86fbf
MQTT14:24:26.594 [vert.x-eventloop-thread-2] INFO o.e.h.s.m.LoggingConnectionEventProducer - Connected - ID: dd3ca1bb-a6d1-4385-9712-b716f5f0d480, Protocol Adapter: hono-mqtt, Device: device [device-id: eb0dbf7a-cc4b-4469-8550-1e4a5a1d49f1, tenant-id: 27b2ce55-7e45-4327-9e3b-5e7197f86fbf], Data: null

this is from client, can you clarify ?

No, all of this is from the MQTT server component. Hono consists of multiple (micro-)services. The MQTT adapter is one of them. When an MQTT client connects, it calls out to several other services to e.g. authenticate the client, assert the client's registration status and finally forward the published data to downstream consumers.

The interesting thing (again) is, that the statements have been issued from different threads, isn't it? The MQTT server is deployed as a single Verticle, starts the MQTT server and registers a connection and publish handler. Shouldn't all these handlers then run on the same (single) thread, i.e. the one that the verticle is running on?

@sophokles73
Copy link
Contributor Author

In the meantime I have seen this happening twice again on our Travis instance. Once during the execution of the EventMqttIT (publishing messages using QoS 1):

[INFO] Running org.eclipse.hono.tests.mqtt.EventMqttIT
13:39:32.999 [vert.x-eventloop-thread-0] INFO  o.e.hono.client.impl.HonoClientImpl - closed connection to server [localhost:32768]
13:39:33.020 [main] INFO  o.e.hono.tests.mqtt.EventMqttIT - running testUploadMessagesUsingShortTopicNames
Jun 13, 2018 1:39:33 PM io.vertx.mqtt.impl.MqttClientImpl
INFO: Connection with localhost:32770 established successfully
MQTT13:39:33.105 [vert.x-eventloop-thread-0] INFO  o.e.h.c.i.AbstractRequestResponseClient - enabling caching of responses from registration/aaa87c5d-0a5b-4d77-975d-07354da359bc
MQTT13:39:33.144 [vert.x-eventloop-thread-2] INFO  o.e.h.s.m.LoggingConnectionEventProducer -    Connected - ID: 2ba3767c-5759-44f3-84eb-42f1e36614a1, Protocol Adapter: hono-mqtt, Device: device [device-id: 7de41185-d0a4-4cfd-b2fa-5495005db331, tenant-id: aaa87c5d-0a5b-4d77-975d-07354da359bc], Data: null
13:39:33.748 [vert.x-eventloop-thread-3] INFO  o.e.hono.tests.mqtt.EventMqttIT - messages sent: 40
13:39:33.751 [vert.x-eventloop-thread-3] INFO  o.e.hono.tests.mqtt.EventMqttIT - messages received: 40
13:39:34.082 [vert.x-eventloop-thread-3] INFO  o.e.hono.tests.mqtt.EventMqttIT - messages sent: 80
13:39:34.086 [vert.x-eventloop-thread-3] INFO  o.e.hono.tests.mqtt.EventMqttIT - messages received: 80
13:39:34.460 [vert.x-eventloop-thread-3] INFO  o.e.hono.tests.mqtt.EventMqttIT - messages received: 120
13:39:34.462 [vert.x-eventloop-thread-3] INFO  o.e.hono.tests.mqtt.EventMqttIT - messages sent: 120
13:39:34.752 [vert.x-eventloop-thread-3] INFO  o.e.hono.tests.mqtt.EventMqttIT - messages received: 160
13:39:34.753 [vert.x-eventloop-thread-3] INFO  o.e.hono.tests.mqtt.EventMqttIT - messages sent: 160
13:39:35.008 [vert.x-eventloop-thread-3] INFO  o.e.hono.tests.mqtt.EventMqttIT - messages received: 200
13:39:35.010 [vert.x-eventloop-thread-3] INFO  o.e.hono.tests.mqtt.EventMqttIT - messages sent: 200
13:39:35.010 [main] INFO  o.e.hono.tests.mqtt.EventMqttIT - sent 200 and received 200 messages in 1863 milliseconds
13:39:35.011 [main] INFO  o.e.hono.tests.mqtt.EventMqttIT - connection to MQTT adapter closed
MQTT13:39:35.021 [vert.x-eventloop-thread-0] INFO  o.e.h.s.m.LoggingConnectionEventProducer - Disconnected - ID: 2ba3767c-5759-44f3-84eb-42f1e36614a1, Protocol Adapter: hono-mqtt, Device: device [device-id: 7de41185-d0a4-4cfd-b2fa-5495005db331, tenant-id: aaa87c5d-0a5b-4d77-975d-07354da359bc], Data: null
13:39:35.056 [main] INFO  o.e.hono.tests.mqtt.EventMqttIT - running testUploadMessages
Jun 13, 2018 1:39:35 PM io.vertx.mqtt.impl.MqttClientImpl
INFO: Connection with localhost:32770 established successfully
MQTT13:39:35.132 [vert.x-eventloop-thread-0] INFO  o.e.h.c.i.AbstractRequestResponseClient - enabling caching of responses from registration/d4550a31-b621-4894-8f73-15704562e312
MQTT13:39:35.168 [vert.x-eventloop-thread-2] INFO  o.e.h.s.m.LoggingConnectionEventProducer -    Connected - ID: 82ba743a-796c-4ec3-a7f7-600d74a7b347, Protocol Adapter: hono-mqtt, Device: device [device-id: c501eee7-4dc9-4bae-8554-b394cd837909, tenant-id: d4550a31-b621-4894-8f73-15704562e312], Data: null
MQTT13:39:35.178 [vert.x-eventloop-thread-0] ERROR io.vertx.core.impl.ContextImpl - Unhandled exception
MQTTjava.lang.IllegalStateException: Received an MQTT packet from a not connected client (CONNECT not sent yet)
MQTT	at io.vertx.mqtt.impl.MqttServerConnection.checkConnected(MqttServerConnection.java:408)
MQTT	at io.vertx.mqtt.impl.MqttServerConnection.handlePublish(MqttServerConnection.java:324)
MQTT	at io.vertx.mqtt.impl.MqttServerConnection.handleMessage(MqttServerConnection.java:136)
MQTT	at io.vertx.mqtt.impl.MqttServerImpl.lambda$null$3(MqttServerImpl.java:93)
MQTT	at io.vertx.core.net.impl.NetSocketImpl.handleMessageReceived(NetSocketImpl.java:351)
MQTT	at io.vertx.core.net.impl.NetServerImpl$2.handleMessage(NetServerImpl.java:446)
MQTT	at io.vertx.core.net.impl.NetServerImpl$2.handleMessage(NetServerImpl.java:443)
MQTT	at io.vertx.core.net.impl.VertxHandler.lambda$channelRead$1(VertxHandler.java:146)
MQTT	at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:337)
MQTT	at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:195)
MQTT	at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:144)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
MQTT	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
MQTT	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
MQTT	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
MQTT	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
MQTT	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
MQTT	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
MQTT	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:141)
MQTT	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
MQTT	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
MQTT	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
MQTT	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
MQTT	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
MQTT	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
MQTT	at java.lang.Thread.run(Thread.java:748)
MQTT13:39:35.180 [vert.x-eventloop-thread-0] INFO  o.e.h.s.m.LoggingConnectionEventProducer - Disconnected - ID: 82ba743a-796c-4ec3-a7f7-600d74a7b347, Protocol Adapter: hono-mqtt, Device: device [device-id: c501eee7-4dc9-4bae-8554-b394cd837909, tenant-id: d4550a31-b621-4894-8f73-15704562e312], Data: null

and again during execution of the TelemetryMqttQoS1IT (publishing telemetry messages using QoS 1):

13:15:43.065 [main] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS1IT - running testUploadMessagesUsingShortTopicNames
Jun 16, 2018 1:15:43 PM io.vertx.mqtt.impl.MqttClientImpl
INFO: Connection with localhost:32773 established successfully
MQTT13:15:43.680 [vert.x-eventloop-thread-1] INFO  o.e.h.c.i.AbstractRequestResponseClient - enabling caching of responses from tenant
MQTT13:15:43.944 [vert.x-eventloop-thread-0] INFO  o.e.h.c.i.AbstractRequestResponseClient - enabling caching of responses from registration/3dba55bf-c476-410d-af96-28ee8bd85353
MQTT13:15:44.133 [vert.x-eventloop-thread-2] INFO  o.e.h.s.m.LoggingConnectionEventProducer -    Connected - ID: 8e92b337-0dd7-4f9c-b7e7-26a91d8664e0, Protocol Adapter: hono-mqtt, Device: device [device-id: c0a75552-f7b3-426b-8243-b93d37409d62, tenant-id: 3dba55bf-c476-410d-af96-28ee8bd85353], Data: null
13:15:44.693 [vert.x-eventloop-thread-0] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS1IT - messages received: 40
13:15:44.701 [vert.x-eventloop-thread-0] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS1IT - messages sent: 40
13:15:44.994 [vert.x-eventloop-thread-0] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS1IT - messages received: 80
13:15:44.997 [vert.x-eventloop-thread-0] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS1IT - messages sent: 80
13:15:45.209 [vert.x-eventloop-thread-0] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS1IT - messages received: 120
13:15:45.215 [vert.x-eventloop-thread-0] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS1IT - messages sent: 120
13:15:45.392 [vert.x-eventloop-thread-0] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS1IT - messages received: 160
13:15:45.394 [vert.x-eventloop-thread-0] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS1IT - messages sent: 160
13:15:45.617 [vert.x-eventloop-thread-0] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS1IT - messages received: 200
13:15:45.619 [vert.x-eventloop-thread-0] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS1IT - messages sent: 200
13:15:45.620 [main] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS1IT - sent 200 and received 200 messages in 1476 milliseconds
13:15:45.622 [main] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS1IT - connection to MQTT adapter closed
MQTT13:15:45.624 [vert.x-eventloop-thread-0] INFO  o.e.h.s.m.LoggingConnectionEventProducer - Disconnected - ID: 8e92b337-0dd7-4f9c-b7e7-26a91d8664e0, Protocol Adapter: hono-mqtt, Device: device [device-id: c0a75552-f7b3-426b-8243-b93d37409d62, tenant-id: 3dba55bf-c476-410d-af96-28ee8bd85353], Data: null
13:15:45.659 [main] INFO  o.e.h.tests.mqtt.TelemetryMqttQoS1IT - running testUploadMessages
Jun 16, 2018 1:15:45 PM io.vertx.mqtt.impl.MqttClientImpl
INFO: Connection with localhost:32773 established successfully
MQTT13:15:45.700 [vert.x-eventloop-thread-0] INFO  o.e.h.c.i.AbstractRequestResponseClient - enabling caching of responses from registration/64cbaf8b-181e-4431-a185-4a825b2be214
MQTT13:15:45.724 [vert.x-eventloop-thread-2] INFO  o.e.h.s.m.LoggingConnectionEventProducer -    Connected - ID: 8967aa96-51ab-4ee7-a128-61696097789f, Protocol Adapter: hono-mqtt, Device: device [device-id: 8693f84c-7d27-4704-bbed-3e058bb47452, tenant-id: 64cbaf8b-181e-4431-a185-4a825b2be214], Data: null
MQTT13:15:45.730 [vert.x-eventloop-thread-0] ERROR io.vertx.core.impl.ContextImpl - Unhandled exception
MQTTjava.lang.IllegalStateException: Received an MQTT packet from a not connected client (CONNECT not sent yet)
MQTT	at io.vertx.mqtt.impl.MqttServerConnection.checkConnected(MqttServerConnection.java:408)
MQTT	at io.vertx.mqtt.impl.MqttServerConnection.handlePublish(MqttServerConnection.java:324)
MQTT	at io.vertx.mqtt.impl.MqttServerConnection.handleMessage(MqttServerConnection.java:136)
MQTT	at io.vertx.mqtt.impl.MqttServerImpl.lambda$null$3(MqttServerImpl.java:93)
MQTT	at io.vertx.core.net.impl.NetSocketImpl.handleMessageReceived(NetSocketImpl.java:351)
MQTT	at io.vertx.core.net.impl.NetServerImpl$2.handleMessage(NetServerImpl.java:446)
MQTT	at io.vertx.core.net.impl.NetServerImpl$2.handleMessage(NetServerImpl.java:443)
MQTT	at io.vertx.core.net.impl.VertxHandler.lambda$channelRead$1(VertxHandler.java:146)
MQTT	at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:337)
MQTT	at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:195)
MQTT	at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:144)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
MQTT	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
MQTT	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
MQTT	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
MQTT	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
MQTT	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
MQTT	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
MQTT	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
MQTT	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:141)
MQTT	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
MQTT	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
MQTT	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
MQTT	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
MQTT	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
MQTT	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
MQTT	at java.lang.Thread.run(Thread.java:748)
MQTT13:15:45.736 [vert.x-eventloop-thread-0] INFO  o.e.h.s.m.LoggingConnectionEventProducer - Disconnected - ID: 8967aa96-51ab-4ee7-a128-61696097789f, Protocol Adapter: hono-mqtt, Device: device [device-id: 8693f84c-7d27-4704-bbed-3e058bb47452, tenant-id: 64cbaf8b-181e-4431-a185-4a825b2be214], Data: null

@sophokles73
Copy link
Contributor Author

sophokles73 commented Jun 18, 2018

@vietj

Do we agree that the different handlers registered when starting the MQTT server should actually all be running on the same thread? Maybe we are doing something wrong in registering the handlers? Or is there in fact a problem in how MqttServerImpl registers the handlers with netty?

@sophokles73
Copy link
Contributor Author

@vietj any news on this?

@vietj
Copy link
Contributor

vietj commented Jul 4, 2018

no time to look at it unfortunately :-( , I'll do my best to have a look

@vietj
Copy link
Contributor

vietj commented Jul 4, 2018

a simple reproducer would have make things easier

@sophokles73
Copy link
Contributor Author

@vietj can you at least state your opinion on my latest comment?

Do we agree that the different handlers registered when starting the MQTT server should actually all be running on the same thread? Maybe we are doing something wrong in registering the handlers? Or is there in fact a problem in how MqttServerImpl registers the handlers with netty?

@vietj
Copy link
Contributor

vietj commented Jul 4, 2018

yes @sophokles73

as far as I remember when I looked at the code I think I found callbacks that were registering things in vertx from an external thread (coming from spring) and they should actually run on the verticle context to register things to avoid several event loops, for instance:

// Another non vertx thread

// Get the verticle context
Context ctx = verticle.getContext();

// Run on event loop
context.runOnContext(v -> {
  // perform registration using the right thread
});

you can read that to understand more https://github.com/vietj/vertx-materials/blob/master/src/main/asciidoc/output/Demystifying_the_event_loop.adoc

@vietj
Copy link
Contributor

vietj commented Jul 4, 2018

so you want to make sure that all registrations are done using the correct event loop

@sophokles73
Copy link
Contributor Author

this sounds very reasonable and in fact might already be the solution to our problem. I will check and see if I can make sure to register the handlers all on the same (event loop) thread and let you know ...

@sophokles73
Copy link
Contributor Author

@vietj ok, the problem is indeed related to the fact that the connection handler that we register for the MqttServer invokes other services which run on a different event-loop thread. When this other service completes the result future held by the connection handler, then the CONNACK packet is sent on that other service's context (and underlying thread). The MqttEndpoint's connected flag is then set to true. The first PUBLISH packet sent by the device is then handled again on the MqttServer's context (which is different from the context that the CONNACK had been sent on). In this case a race condition may occur because the current thread may not (yet) see the updated value of the connection property.
One might argue that it is not a good idea to switch contexts in the handlers registered on the MqttServer, however, I believe that simply declaring the connected property as volatile would already fix this problem. WDYT?

@vietj
Copy link
Contributor

vietj commented Jul 6, 2018

@sophokles73 can you provide a simple reproducer case based out of this reasoning ? in general we don't use volatile but instead we synchronize on the connection, that's why a reproducer would help me better understand the case

@sophokles73
Copy link
Contributor Author

@vietj I have created #103 which contains a (failing) test case which tries to reproduce the problem.

@vietj
Copy link
Contributor

vietj commented Jul 6, 2018

awesome, I believe we can get a fix for 3.5.3

@vietj
Copy link
Contributor

vietj commented Jul 10, 2018

I think the test you created is incorrect because the check has endpoint set to false in

    if ((this.endpoint != null) && (this.endpoint.isConnected())) {
      return true;
    } else {
      so.close();
      throw new IllegalStateException("Received an MQTT packet from a not connected client (CONNECT not sent yet)");
    }

so it does not really reproduce what we are trying to.

@vietj
Copy link
Contributor

vietj commented Jul 10, 2018

I've been trying to reproduce this bug with a real case and it's quite impossible.

I see that when accept(false) is called it sends a network message to the client and set the connected boolean to true.

The issue Received an MQTT packet from a not connected client arise when the packet arrives and boolean is set to false, which is very hard to reproducer because between the time the packet is sent and the next packet from client is received (publish msg), there is just lot of time.

@vietj
Copy link
Contributor

vietj commented Jul 10, 2018

I've also tried to run the hono reproducer lot of time and can't get it reproduced.

so for 3.5.3 I will fix the synchronisation and it should fix the issue even if it is hard to reproduce.

@vietj
Copy link
Contributor

vietj commented Jul 10, 2018

actually was able to reproduce it once, I will still work on the same fix and see if I can run it many times without reproducing.

@sophokles73
Copy link
Contributor Author

I think the test you created is incorrect because the check has endpoint set to false in

Do you mean endpoint is null or endpoint.isConnected() returns false? The latter is actually what the test case is trying to illustrate, i.e. that the value of the connected field which has been updated in the thread that sent the CONNACK, is not (yet) visible in the thread that is handling the PUBLISH.

I agree that synchronizing access to the connected field should solve this problem. Have you checked, if the test case then succeeds?

@vietj
Copy link
Contributor

vietj commented Jul 10, 2018

null as far as I remember

I'm currently running IT tests intensively to check the fix.

@vietj
Copy link
Contributor

vietj commented Jul 10, 2018

can you test the 3.5 branch on your side if I push the fix ?

@sophokles73
Copy link
Contributor Author

I can use it for doing development and see if the problem occurs again when running integration tests. However, I will also take a look at the unit test.

@vietj
Copy link
Contributor

vietj commented Jul 10, 2018

sure thing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

4 participants