From 7c180f83462ef963547bb064eee52290219a705d Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Tue, 23 Feb 2021 21:56:29 +0100 Subject: [PATCH] DPL: fix multithreading issues in WS driver More fixes to avoid Monitoring reporting and WS driver race conditions. --- Framework/Core/src/DPLWebSocket.cxx | 1 + Framework/Core/src/DPLWebSocket.h | 5 +++-- Framework/Core/src/WSDriverClient.cxx | 12 +++++++----- Framework/Core/src/WSDriverClient.h | 7 ++++++- 4 files changed, 17 insertions(+), 8 deletions(-) diff --git a/Framework/Core/src/DPLWebSocket.cxx b/Framework/Core/src/DPLWebSocket.cxx index 49826d4dd9c9b..9edb467689312 100644 --- a/Framework/Core/src/DPLWebSocket.cxx +++ b/Framework/Core/src/DPLWebSocket.cxx @@ -13,6 +13,7 @@ #include "Framework/DeviceSpec.h" #include "HTTPParser.h" #include +#include #include #include #include diff --git a/Framework/Core/src/DPLWebSocket.h b/Framework/Core/src/DPLWebSocket.h index 777acd6b3ceeb..952bd82ad6c91 100644 --- a/Framework/Core/src/DPLWebSocket.h +++ b/Framework/Core/src/DPLWebSocket.h @@ -16,6 +16,7 @@ #include #include #include +#include class uv_stream_s; @@ -70,11 +71,11 @@ struct WSDPLClient : public HTTPParser { /// Dump headers void dumpHeaders(); void sendHandshake(); - bool isConnected() { return mHandshaken; } + bool isHandshaken() { return mHandshaken; } std::string mNonce; DeviceSpec const& mSpec; - bool mHandshaken = false; + std::atomic mHandshaken = false; std::function mHandshake; uv_stream_t* mStream = nullptr; std::map mHeaders; diff --git a/Framework/Core/src/WSDriverClient.cxx b/Framework/Core/src/WSDriverClient.cxx index 6776b667cae4e..06ecd04d473a2 100644 --- a/Framework/Core/src/WSDriverClient.cxx +++ b/Framework/Core/src/WSDriverClient.cxx @@ -28,6 +28,7 @@ void on_connect(uv_connect_t* connection, int status) auto onHandshake = [client]() { client->flushPending(); }; + std::lock_guard lock(client->mutex()); client->setDPLClient(std::make_unique(connection->handle, client->spec(), onHandshake)); client->sendHandshake(); } @@ -62,6 +63,7 @@ void sendMessageToDriver(std::unique_ptr& client, ch void WSDriverClient::setDPLClient(std::unique_ptr client) { mClient = std::move(client); + mConnected = true; } void WSDriverClient::sendHandshake() @@ -76,23 +78,23 @@ void WSDriverClient::observe(const char*, std::function) void WSDriverClient::tell(const char* msg, size_t s, bool flush) { - static std::mutex tellMutex; - std::lock_guard guard(tellMutex); - static bool printed1 = false; static bool printed2 = false; - if (mClient && mClient->isConnected() && flush) { + if (mConnected && mClient->isHandshaken() && flush) { flushPending(); + std::lock_guard lock(mClientMutex); std::vector outputs; encode_websocket_frames(outputs, msg, s, WebSocketOpCode::Binary, 0); mClient->write(outputs); } else { + std::lock_guard lock(mClientMutex); encode_websocket_frames(mBacklog, msg, s, WebSocketOpCode::Binary, 0); } } void WSDriverClient::flushPending() { + std::lock_guard lock(mClientMutex); static bool printed1 = false; static bool printed2 = false; if (!mClient) { @@ -104,7 +106,7 @@ void WSDriverClient::flushPending() } return; } - if (!(mClient->isConnected())) { + if (!(mClient->isHandshaken())) { if (mBacklog.size() > 1000) { if (!printed2) { LOG(WARNING) << "Unable to communicate with driver because client is not connected"; diff --git a/Framework/Core/src/WSDriverClient.h b/Framework/Core/src/WSDriverClient.h index b817a3d1ea4b1..b4314584168ab 100644 --- a/Framework/Core/src/WSDriverClient.h +++ b/Framework/Core/src/WSDriverClient.h @@ -16,6 +16,8 @@ #include #include #include +#include +#include typedef struct uv_connect_s uv_connect_t; @@ -42,10 +44,13 @@ class WSDriverClient : public DriverClient DeviceSpec const& spec() { return mSpec; } // Initiate a websocket session void sendHandshake(); + std::mutex& mutex() { return mClientMutex; } private: + // Whether or not we managed to connect. + std::atomic mConnected = false; + std::mutex mClientMutex; DeviceSpec const& mSpec; - bool mConnected = false; std::vector mBacklog; uv_connect_t* mConnection = nullptr; std::unique_ptr mClient;