1.paho.mqtt.c库编译
git clone https://github.com/eclipse/paho.mqtt.c.git
cd paho.mqtt.c
mkdir build
cd build
cmake -DCMAKE_INSTALL_PREFIX=$PWD/../out ../
make && make install
这样就编译好了, 输出在out目录
├── bin
│ └── MQTTVersion
├── include
│ ├── MQTTAsync.h
│ ├── MQTTClient.h
│ ├── MQTTClientPersistence.h
│ ├── MQTTExportDeclarations.h
│ ├── MQTTProperties.h
│ ├── MQTTReasonCodes.h
│ └── MQTTSubscribeOpts.h
├── lib
│ ├── cmake
│ │ └── eclipse-paho-mqtt-c
│ │ ├── eclipse-paho-mqtt-cConfig.cmake
│ │ ├── eclipse-paho-mqtt-cConfig-noconfig.cmake
│ │ └── eclipse-paho-mqtt-cConfigVersion.cmake
│ ├── libpaho-mqtt3a.so -> libpaho-mqtt3a.so.1
│ ├── libpaho-mqtt3a.so.1 -> libpaho-mqtt3a.so.1.3.6
│ ├── libpaho-mqtt3a.so.1.3.6
│ ├── libpaho-mqtt3c.so -> libpaho-mqtt3c.so.1
│ ├── libpaho-mqtt3c.so.1 -> libpaho-mqtt3c.so.1.3.6
│ └── libpaho-mqtt3c.so.1.3.6
2.封装一个client同时支持pub和sub
paho-mqtt
提供的例子分别pub和sub, 这里我们基于c++11 做个简单的封装, 使用hash map
封装sub topic以及接收到topic的回调
// mqtt_client.h
#ifndef MQTT_CLIENT_H_
#define MQTT_CLIENT_H_
#ifdef __cplusplus
extern "C" {
#endif
typedef struct __mqtt_client * mqtt_client;
/**
* @brief 创建mqtt client
*
* @param path mqtt server地址, NULL为默认host:1883
* @param id clientid
*
* @return mqtt handle
*/
mqtt_client mqtt_client_create(const char* path, const char* id);
/**
* @brief 释放mqtt client
*
* @param client handle
* @param id clientid
*
* @return 0 成功 其他失败
*/
int mqtt_client_destroy(mqtt_client client);
/**
* @brief 从mqtt的订阅列表中添加订阅处理
*
* @param client mqtt handle
* @param topic topic name
* @param qos qos
* @param func 处理该消息的函数
* @param user_data 回传的数据
*
* @return 0 成功 其他失败
*/
void mqtt_client_sub_list_push(mqtt_client client, const char* topic, int qos, int(*func)(mqtt_client client, const char* playload, size_t len, int qos, void* user_data), void* user_data);
/**
* @brief 从mqtt的订阅列表中移除订阅处理
*
* @param client mqtt handle
* @param topic topic name
*
* @return 0 成功 其他失败
*/
void mqtt_client_sub_list_pop(mqtt_client client, const char* topic);
/**
* @brief 订阅消息
*
* @param client mqtt handle
*
* @return 0 成功 其他失败
*/
int mqtt_client_sub(mqtt_client client);
/**
* @brief 取消订阅消息
*
* @param client mqtt handle
* @param topic topic name
*
* @return 0 成功 其他失败
*/
int mqtt_client_unsub(mqtt_client client, const char* topic);
/**
* @brief 发布消息
*
* @param client mqtt handle
* @param topic topic name
* @param playload playload
* @param playload_len playload len
*
* @return 0 成功 其他失败
*/
int mqtt_client_pub(mqtt_client client, const char* topic, const char* playload, size_t playload_len);
#ifdef __cplusplus
}
#endif
#endif // MQTT_CLIENT_H_
#include "paho_mqtt/MQTTClient.h"
#include "mqtt_client/mqtt_client.h"
#include <unordered_map>
#include <string>
#include <functional>
#include <algorithm>
#include <thread>
#include <mutex>
#include <string.h>
using mqtt_msg_cb = std::function<int(mqtt_client client, const char* payload, size_t len, int qos, void* user_data)>;
struct __mqtt_client {
__mqtt_client() {
mqtt = NULL;
conn_opts = MQTTClient_connectOptions_initializer;
is_running = false;
}
MQTTClient mqtt;
MQTTClient_connectOptions conn_opts;
bool is_running;
std::unordered_map<std::string, std::tuple<int, mqtt_msg_cb, void*>> sub_list;
std::thread thread;
};
static const char* DEFAULT_ADDR = "localhost:1883";
int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
printf("delivered- %p, topic: %s\n", context, topicName);
return 0;
}
void delivered(void *context, MQTTClient_deliveryToken dt)
{
printf("delivered- %p\n", context);
}
mqtt_client mqtt_client_create(const char* path, const char* id)
{
__mqtt_client* client = new __mqtt_client();
if (client) {
if (client->mqtt == NULL) {
const char* addr = path;
if (addr == NULL || strlen(addr) == 0) {
addr = DEFAULT_ADDR;
}
printf("create mqtt client, addr: %s\n", addr);
int rc = MQTTClient_create(&client->mqtt, addr, id, MQTTCLIENT_PERSISTENCE_NONE, NULL);
if (rc != MQTTCLIENT_SUCCESS) {
delete client;
client = nullptr;
return nullptr;
}
}
// 连接一次
int rc = MQTTClient_connect(client->mqtt, &client->conn_opts);
if (rc != MQTTCLIENT_SUCCESS) {
printf("failed to connect to mqtt server\n");
}
client->is_running = true;
client->thread = std::thread([client]()
{
while (client->is_running) {
if (!MQTTClient_isConnected(client->mqtt)) { // 断开后重新连接
printf("reconnect\n");
int rc = MQTTClient_connect(client->mqtt, &client->conn_opts);
if (rc != MQTTCLIENT_SUCCESS) {
printf("failed to connect to mqtt server\n");
}
// 重新sub
mqtt_client_sub(client);
} else {
char* topic = NULL;
int topicLen;
MQTTClient_message* msg = NULL;
int rc = MQTTClient_receive(client->mqtt, &topic, &topicLen, &msg, 1000);
if (msg) {
printf("tocc: %s, qos: %d, payload: %s\n", topic, msg->qos, msg->payload);
auto it = client->sub_list.find(std::string(topic));
if (it != client->sub_list.end()) {
std::get<1>(it->second)(client, (const char*)msg->payload, msg->payloadlen, msg->qos, std::get<2>(it->second));
}
MQTTClient_freeMessage(&msg);
MQTTClient_free(topic);
}
if (rc != MQTTCLIENT_SUCCESS) {
printf("err: %d\n", rc);
}
}
}
});
}
return client;
}
int mqtt_client_destroy(mqtt_client client)
{
client->is_running = false;
if (client) {
if (client->thread.joinable())
client->thread.join();
if (client->mqtt) {
MQTTClient_disconnect(client->mqtt, 500);
MQTTClient_destroy(&client->mqtt);
}
delete client;
client = nullptr;
}
return MQTTCLIENT_SUCCESS;
}
void mqtt_client_sub_list_push(mqtt_client client, const char* topic, int qos, int(*func)(mqtt_client client, const char* payload, size_t len, int qos, void* user_data), void* user_data)
{
if (client && topic) {
client->sub_list[std::string(topic)] = std::make_tuple(qos, func, user_data);
}
}
void mqtt_client_sub_list_pop(mqtt_client client, const char* topic)
{
if (client && topic) {
client->sub_list.erase(std::string(topic));
}
}
int mqtt_client_sub(mqtt_client client)
{
if (client == NULL || !client->mqtt) {
return MQTTCLIENT_FAILURE;
}
if (!MQTTClient_isConnected(client->mqtt)) {
return MQTTCLIENT_DISCONNECTED;
}
std::for_each(client->sub_list.begin(), client->sub_list.end(),
[client](const std::unordered_map<std::string, std::tuple<int, mqtt_msg_cb, void*>>::value_type& element) {
printf("MQTTClient_subscribe, topic :%s, qos: %d\n", element.first.c_str(), std::get<0>(element.second));
MQTTClient_subscribe(client->mqtt, element.first.c_str(), std::get<0>(element.second));
});
return MQTTCLIENT_SUCCESS;
}
int mqtt_client_unsub(mqtt_client client, const char* topic)
{
if (client == NULL || !client->mqtt) {
return MQTTCLIENT_FAILURE;
}
if (!MQTTClient_isConnected(client->mqtt)) {
return MQTTCLIENT_DISCONNECTED;
}
auto it = client->sub_list.find(std::string(topic));
if (it != client->sub_list.end()) {
MQTTClient_unsubscribe(client->mqtt, topic);
client->sub_list.erase(it);
}
return MQTTCLIENT_SUCCESS;
}
int mqtt_client_pub(mqtt_client client, const char* topic, const char* payload, size_t playload_len)
{
if (client == NULL || !client->mqtt) {
return MQTTCLIENT_FAILURE;
}
if (!MQTTClient_isConnected(client->mqtt)) {
return MQTTCLIENT_DISCONNECTED;
}
printf("MQTTClient_publish, topic :%s, playlad: %.*s\n", topic, playload_len, payload);
MQTTClient_publish(client->mqtt, topic, playload_len, payload, 1, 0, NULL);
return MQTTCLIENT_SUCCESS;
}
测试
#include <unistd.h>
#ifndef _WIN32
#include <sys/prctl.h>
#endif
#include "mqtt_client/mqtt_client.h"
#include <string>
#include <thread>
const static float TARGET_X = 0.0f;
const static float TARGET_Y = 0.0f;
void testThread(int id, mqtt_client client)
{
size_t count = 0;
while (1) {
if (count >= id) {
break;
}
std::string str = "{\"x\":" + std::to_string(TARGET_X) + ",\"y\":" + std::to_string(TARGET_Y) + "}";
mqtt_client_pub(client, "/pibot/body/move", str.c_str(), str.length());
usleep(100*1000);
}
}
int pos_update_cb(mqtt_client client, const char* playload, size_t len, int qos, void* user_data)
{
printf("pos_update_cb, playload: %.*s, qos: %d/n", len, playload, qos);
return 0;
}
int main()
{
mqtt_client client = mqtt_client_create(NULL, "pibot");
if (client == NULL) {
return 0;
}
mqtt_client_sub_list_push(client, "/pibot/pos", 2, pos_update_cb, NULL);
if (mqtt_client_sub(client) != 0) {
printf("err\n");
}
std::thread t1 = std::thread(testThread, 10000, client);
std::thread t2 = std::thread(testThread, 20000, client);
t1.join();
t2.join();
mqtt_client_destroy(client);
return 0;
}
网友评论