Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix MQTT plugin (#1006, #1008) #1009

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 143 additions & 19 deletions plotjuggler_plugins/DataStreamMQTT/mqtt_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@
#include <QDebug>
#include <QMessageBox>
#include <QString>
#include <QtGlobal>
#include <QLoggingCategory>

#ifdef WIN32
#include <windows.h>
#include <strsafe.h>
#endif

#define MQTT_DEBUG 0
#define debug() qCDebug(category)

static const QLoggingCategory category("MQTTClient");

void connect_callback(struct mosquitto* mosq, void* context, int result, int,
const mosquitto_property*)
{
Expand Down Expand Up @@ -47,16 +54,39 @@ void message_callback(struct mosquitto* mosq, void* context,
self->onMessageReceived(message);
}

void log_callback(struct mosquitto* mosq, void* context, int log_level, const char* msg)
{
const std::pair<int, const char*> log_level_map[] = {
{ MOSQ_LOG_INFO, "MOSQ_LOG_INFO" },
{ MOSQ_LOG_NOTICE, "MOSQ_LOG_NOTICE" },
{ MOSQ_LOG_WARNING, "MOSQ_LOG_WARNING" },
{ MOSQ_LOG_ERR, "MOSQ_LOG_ERR " },
{ MOSQ_LOG_DEBUG, "MOSQ_LOG_DEBUG" },
{ MOSQ_LOG_SUBSCRIBE, "MOSQ_LOG_SUBSCRIBE" },
{ MOSQ_LOG_UNSUBSCRIBE, "MOSQ_LOG_UNSUBSCRIBE" },
{ MOSQ_LOG_WEBSOCKETS, "MOSQ_LOG_WEBSOCKETS" },
};

const auto it =
std::find_if(std::begin(log_level_map), std::end(log_level_map),
[log_level](const auto& pair) { return log_level == pair.first; });
if (it == std::end(log_level_map))
return;

debug() << it->second << ": " << msg;
}

//----------------------------

MQTTClient::MQTTClient()
{
mosquitto_lib_init();
_mosq = mosquitto_new(nullptr, true, this);

mosquitto_connect_v5_callback_set(_mosq, connect_callback);
mosquitto_disconnect_callback_set(_mosq, disconnect_callback);
mosquitto_message_v5_callback_set(_mosq, message_callback);
#if MQTT_DEBUG
int major = 0, minor = 0, revision = 0;
mosquitto_lib_version(&major, &minor, &revision);
debug() << "mosquitto version: " << major << "." << minor << "." << revision;
#endif // MQTT_DEBUG
}

MQTTClient::~MQTTClient()
Expand All @@ -75,13 +105,53 @@ bool MQTTClient::connect(const MosquittoConfig& config)
disconnect();
}

mosquitto_int_option(_mosq, MOSQ_OPT_PROTOCOL_VERSION, config.protocol_version);
// Start with a fresh mosquitto instance.
Q_ASSERT(_mosq == nullptr);
_mosq = mosquitto_new(nullptr, true, this);

bool success = configureMosquitto(config);
if (!success)
{
mosquitto_destroy(_mosq);
_mosq = nullptr;
return false;
}

_connected = true;
_config = config;
return true;
}

bool MQTTClient::configureMosquitto(const MosquittoConfig& config)
{
mosquitto_connect_v5_callback_set(_mosq, connect_callback);
mosquitto_disconnect_callback_set(_mosq, disconnect_callback);
mosquitto_message_v5_callback_set(_mosq, message_callback);
#if MQTT_DEBUG
mosquitto_log_callback_set(_mosq, log_callback);
#endif

int rc =
mosquitto_int_option(_mosq, MOSQ_OPT_PROTOCOL_VERSION, config.protocol_version);
if (rc != MOSQ_ERR_SUCCESS)
{
QMessageBox::warning(nullptr, "MQTT Client", QString("MQTT initialization failed."),
QMessageBox::Ok);
debug() << "MQTT initialization failed:" << mosquitto_strerror(rc);
return false;
}

if ((!config.username.empty() || !config.password.empty()))
{
if (mosquitto_username_pw_set(_mosq, config.username.c_str(),
config.password.c_str()))
rc = mosquitto_username_pw_set(_mosq, config.username.c_str(),
config.password.c_str());
if (rc != MOSQ_ERR_SUCCESS)
{
QMessageBox::warning(nullptr, "MQTT Client",
QString("MQTT initialization failed. Double check username "
"and password."),
QMessageBox::Ok);
debug() << "MQTT username or password error:" << mosquitto_strerror(rc);
return false;
}
}
Expand All @@ -91,16 +161,31 @@ bool MQTTClient::connect(const MosquittoConfig& config)
const char* cafile = config.cafile.c_str();
const char* certfile = config.certfile.empty() ? nullptr : config.certfile.c_str();
const char* keyfile = config.keyfile.empty() ? nullptr : config.keyfile.c_str();

mosquitto_tls_set(_mosq, cafile, nullptr, certfile, keyfile, nullptr);
rc = mosquitto_tls_set(_mosq, cafile, nullptr, certfile, keyfile, nullptr);
if (rc != MOSQ_ERR_SUCCESS)
{
QMessageBox::warning(nullptr, "MQTT Client",
QString("MQTT initialization failed. Double check "
"certificates."),
QMessageBox::Ok);
debug() << "MQTT certificate error:" << mosquitto_strerror(rc);
return false;
}
}

mosquitto_max_inflight_messages_set(_mosq, config.max_inflight);
rc = mosquitto_max_inflight_messages_set(_mosq, config.max_inflight);
if (rc != MOSQ_ERR_SUCCESS)
{
QMessageBox::warning(nullptr, "MQTT Client", QString("MQTT initialization failed."),
QMessageBox::Ok);
debug() << "MQTT setting max inflight messages failed:" << mosquitto_strerror(rc);
return false;
}

const mosquitto_property* properties = nullptr; // todo

int rc = mosquitto_connect_bind_v5(_mosq, config.host.c_str(), config.port,
config.keepalive, nullptr, properties);
rc = mosquitto_connect_bind_v5(_mosq, config.host.c_str(), config.port,
config.keepalive, nullptr, properties);
// TODO bind
if (rc > 0)
{
Expand All @@ -121,13 +206,38 @@ bool MQTTClient::connect(const MosquittoConfig& config)
QString("Unable to connect (%1)").arg(mosquitto_strerror(rc)),
QMessageBox::Ok);
}
_connected = false;
debug() << "MQTT connect failed:" << mosquitto_strerror(rc);
return false;
}

_connected = true;
_config = config;
mosquitto_loop_start(_mosq);
rc = mosquitto_loop_start(_mosq);
if (rc == MOSQ_ERR_NOT_SUPPORTED)
{
// Threaded mode may not be supported on windows (libmosquitto < 2.1).
// See https://github.com/eclipse/mosquitto/issues/2707
Q_ASSERT(_thread == nullptr);
_thread = new std::thread([this]() {
int rc = mosquitto_loop_forever(this->_mosq, -1, 1);
if (rc != MOSQ_ERR_SUCCESS)
{
debug() << "MQTT loop forever failed:" << mosquitto_strerror(rc);
}
});
if (_thread == nullptr)
{
QMessageBox::warning(nullptr, "MQTT Client", QString("Failed to start MQTT client"),
QMessageBox::Ok);
debug() << "MQTT start failed: could not allocate memory.";
return false;
}
}
else if (rc != MOSQ_ERR_SUCCESS)
{
QMessageBox::warning(nullptr, "MQTT Client", QString("Failed to start MQTT client"),
QMessageBox::Ok);
debug() << "MQTT start loop failed:" << mosquitto_strerror(rc);
return false;
}
return true;
}

Expand All @@ -137,6 +247,14 @@ void MQTTClient::disconnect()
{
mosquitto_disconnect(_mosq);
mosquitto_loop_stop(_mosq, true);
if (_thread != nullptr)
{
_thread->join();
delete _thread;
_thread = nullptr;
}
mosquitto_destroy(_mosq);
_mosq = nullptr;
}
_connected = false;
_topics_set.clear();
Expand Down Expand Up @@ -181,10 +299,16 @@ std::unordered_set<std::string> MQTTClient::getTopicList()

void MQTTClient::subscribe(const std::string& topic, int qos)
{
mosquitto_subscribe(_mosq, nullptr, topic.c_str(), qos);
if (_connected)
{
mosquitto_subscribe(_mosq, nullptr, topic.c_str(), qos);
}
}

void MQTTClient::unsubscribe(const std::string& topic)
{
mosquitto_unsubscribe(_mosq, nullptr, topic.c_str());
}
if (_connected)
{
mosquitto_unsubscribe(_mosq, nullptr, topic.c_str());
}
}
4 changes: 4 additions & 0 deletions plotjuggler_plugins/DataStreamMQTT/mqtt_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "mosquitto_config.h"
#include <string>
#include <functional>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <mutex>
Expand Down Expand Up @@ -46,11 +47,14 @@ class MQTTClient : public QObject
void disconnected();

private:
bool configureMosquitto(const MosquittoConfig& config);

mosquitto* _mosq = nullptr;
std::unordered_map<std::string, TopicCallback> _message_callbacks;
std::unordered_set<std::string> _topics_set;
std::mutex _mutex;
MosquittoConfig _config;
std::thread* _thread;
};

#endif // MQTT_CLIENT_H