美文网首页
使用paho client c封装一个mqtt

使用paho client c封装一个mqtt

作者: PIBOT导航机器人 | 来源:发表于2020-10-15 10:57 被阅读0次

1.paho.mqtt.c库编译

git clone https://github.com/eclipse/paho.mqtt.c.git
cd paho.mqtt.c
mkdir build
cd build
cmake -DCMAKE_INSTALL_PREFIX=$PWD/../out ../
make && make install

这样就编译好了, 输出在out目录

├── bin
│   └── MQTTVersion
├── include
│   ├── MQTTAsync.h
│   ├── MQTTClient.h
│   ├── MQTTClientPersistence.h
│   ├── MQTTExportDeclarations.h
│   ├── MQTTProperties.h
│   ├── MQTTReasonCodes.h
│   └── MQTTSubscribeOpts.h
├── lib
│   ├── cmake
│   │   └── eclipse-paho-mqtt-c
│   │       ├── eclipse-paho-mqtt-cConfig.cmake
│   │       ├── eclipse-paho-mqtt-cConfig-noconfig.cmake
│   │       └── eclipse-paho-mqtt-cConfigVersion.cmake
│   ├── libpaho-mqtt3a.so -> libpaho-mqtt3a.so.1
│   ├── libpaho-mqtt3a.so.1 -> libpaho-mqtt3a.so.1.3.6
│   ├── libpaho-mqtt3a.so.1.3.6
│   ├── libpaho-mqtt3c.so -> libpaho-mqtt3c.so.1
│   ├── libpaho-mqtt3c.so.1 -> libpaho-mqtt3c.so.1.3.6
│   └── libpaho-mqtt3c.so.1.3.6

2.封装一个client同时支持pub和sub

paho-mqtt提供的例子分别pub和sub, 这里我们基于c++11 做个简单的封装, 使用hash map封装sub topic以及接收到topic的回调

// mqtt_client.h

#ifndef MQTT_CLIENT_H_
#define MQTT_CLIENT_H_

#ifdef __cplusplus
extern "C" {
#endif

typedef struct __mqtt_client * mqtt_client;

/**
 * @brief 创建mqtt client
 * 
 * @param path mqtt server地址, NULL为默认host:1883
 * @param id clientid
 * 
 * @return mqtt handle
 */
mqtt_client mqtt_client_create(const char* path, const char* id);

/**
 * @brief 释放mqtt client
 * 
 * @param client handle
 * @param id clientid
 * 
 * @return 0 成功 其他失败
 */
int mqtt_client_destroy(mqtt_client client);

/**
 * @brief 从mqtt的订阅列表中添加订阅处理
 * 
 * @param client mqtt handle
 * @param topic topic name
 * @param qos qos
 * @param func 处理该消息的函数
 * @param user_data 回传的数据
 * 
 * @return 0 成功 其他失败
 */
void mqtt_client_sub_list_push(mqtt_client client, const char* topic, int qos, int(*func)(mqtt_client client, const char* playload, size_t len, int qos, void* user_data), void* user_data);

/**
 * @brief 从mqtt的订阅列表中移除订阅处理
 * 
 * @param client mqtt handle
 * @param topic topic name
 * 
 * @return 0 成功 其他失败
 */
void mqtt_client_sub_list_pop(mqtt_client client, const char* topic);

/**
 * @brief 订阅消息
 * 
 * @param client mqtt handle
 * 
 * @return 0 成功 其他失败
 */
int mqtt_client_sub(mqtt_client client);

/**
 * @brief 取消订阅消息
 * 
 * @param client mqtt handle
 * @param topic topic name
 * 
 * @return 0 成功 其他失败
 */
int mqtt_client_unsub(mqtt_client client, const char* topic);

/**
 * @brief 发布消息
 * 
 * @param client mqtt handle
 * @param topic topic name
 * @param playload playload
 * @param playload_len playload len
 * 
 * @return 0 成功 其他失败
 */
int mqtt_client_pub(mqtt_client client, const char* topic, const char* playload, size_t playload_len);

#ifdef __cplusplus
}
#endif

#endif // MQTT_CLIENT_H_
#include "paho_mqtt/MQTTClient.h"

#include "mqtt_client/mqtt_client.h"
#include <unordered_map>
#include <string>
#include <functional>
#include <algorithm>
#include <thread>
#include <mutex>
#include <string.h>

using mqtt_msg_cb = std::function<int(mqtt_client client, const char* payload, size_t len, int qos, void* user_data)>;

struct __mqtt_client {
    __mqtt_client() {
        mqtt = NULL;
        conn_opts = MQTTClient_connectOptions_initializer;
        is_running = false;
    }

    MQTTClient mqtt;
    MQTTClient_connectOptions conn_opts;
    bool  is_running;
    std::unordered_map<std::string, std::tuple<int, mqtt_msg_cb, void*>> sub_list;
    std::thread thread;
};

static const char* DEFAULT_ADDR = "localhost:1883";

int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
    printf("delivered- %p, topic: %s\n", context, topicName);

    return 0;
}

void delivered(void *context, MQTTClient_deliveryToken dt)
{
    printf("delivered- %p\n", context);
}

mqtt_client mqtt_client_create(const char* path, const char* id)
{
    __mqtt_client* client = new __mqtt_client();

    if (client) {
        if (client->mqtt == NULL) {
            const char* addr = path;
            if (addr == NULL || strlen(addr) == 0) {
                addr = DEFAULT_ADDR;
            }

            printf("create mqtt client, addr: %s\n", addr);
            int rc = MQTTClient_create(&client->mqtt, addr, id, MQTTCLIENT_PERSISTENCE_NONE, NULL);
            if (rc != MQTTCLIENT_SUCCESS) {
                delete client;
                client = nullptr;
                return nullptr;
            }
        }

        // 连接一次
        int rc = MQTTClient_connect(client->mqtt, &client->conn_opts);
        if (rc != MQTTCLIENT_SUCCESS) {
            printf("failed to connect to mqtt server\n");
        }
        
        client->is_running = true;
        client->thread = std::thread([client]()
        {
            while (client->is_running) {
                if (!MQTTClient_isConnected(client->mqtt)) {    // 断开后重新连接
                    printf("reconnect\n");
                    int rc = MQTTClient_connect(client->mqtt, &client->conn_opts);
                    if (rc != MQTTCLIENT_SUCCESS) {
                        printf("failed to connect to mqtt server\n");
                    }

                    // 重新sub
                    mqtt_client_sub(client);
                } else {
                    char* topic = NULL;
                    int topicLen;
                    MQTTClient_message* msg = NULL;

                    int rc = MQTTClient_receive(client->mqtt, &topic, &topicLen, &msg, 1000);
                    if (msg) {
                        printf("tocc: %s, qos: %d, payload: %s\n", topic, msg->qos, msg->payload);

                        auto it = client->sub_list.find(std::string(topic));
                        if (it != client->sub_list.end()) {
                            std::get<1>(it->second)(client, (const char*)msg->payload, msg->payloadlen, msg->qos, std::get<2>(it->second));
                        }

                        MQTTClient_freeMessage(&msg);
                        MQTTClient_free(topic);
                    }
                    if (rc != MQTTCLIENT_SUCCESS) {
                        printf("err: %d\n", rc);
                    }
                }
            }
        });
    }

    return client;
}

int mqtt_client_destroy(mqtt_client client)
{
    client->is_running = false;

    if (client) {
        if (client->thread.joinable())
            client->thread.join();

        if (client->mqtt) {
            MQTTClient_disconnect(client->mqtt, 500);
            MQTTClient_destroy(&client->mqtt);
        }
        delete client;
        client = nullptr;
    }

    return MQTTCLIENT_SUCCESS;
}

void mqtt_client_sub_list_push(mqtt_client client, const char* topic, int qos, int(*func)(mqtt_client client, const char* payload, size_t len, int qos, void* user_data), void* user_data)
{
    if (client && topic) {
        client->sub_list[std::string(topic)] = std::make_tuple(qos, func, user_data);
    }
}

void mqtt_client_sub_list_pop(mqtt_client client, const char* topic)
{
    if (client && topic) {
        client->sub_list.erase(std::string(topic));
    }
}

int mqtt_client_sub(mqtt_client client)
{
    if (client == NULL || !client->mqtt) {
        return MQTTCLIENT_FAILURE;
    }

    if (!MQTTClient_isConnected(client->mqtt)) {
        return MQTTCLIENT_DISCONNECTED;
    }

    std::for_each(client->sub_list.begin(), client->sub_list.end(),
        [client](const std::unordered_map<std::string, std::tuple<int, mqtt_msg_cb, void*>>::value_type& element) {

        printf("MQTTClient_subscribe, topic :%s, qos: %d\n", element.first.c_str(), std::get<0>(element.second));
        MQTTClient_subscribe(client->mqtt, element.first.c_str(), std::get<0>(element.second));
    });

    return MQTTCLIENT_SUCCESS;
}

int mqtt_client_unsub(mqtt_client client, const char* topic)
{
    if (client == NULL || !client->mqtt) {
        return MQTTCLIENT_FAILURE;
    }

    if (!MQTTClient_isConnected(client->mqtt)) {
        return MQTTCLIENT_DISCONNECTED;
    }

    auto it = client->sub_list.find(std::string(topic));
    if (it != client->sub_list.end()) {
        MQTTClient_unsubscribe(client->mqtt, topic);
        client->sub_list.erase(it);
    }

    return MQTTCLIENT_SUCCESS;
}

int mqtt_client_pub(mqtt_client client, const char* topic, const char* payload, size_t playload_len)
{
    if (client == NULL || !client->mqtt) {
        return MQTTCLIENT_FAILURE;
    }

    if (!MQTTClient_isConnected(client->mqtt)) {
        return MQTTCLIENT_DISCONNECTED;
    }

    printf("MQTTClient_publish, topic :%s, playlad: %.*s\n", topic, playload_len, payload);
    MQTTClient_publish(client->mqtt, topic, playload_len, payload, 1, 0, NULL);

    return MQTTCLIENT_SUCCESS;
}

测试


#include <unistd.h>
#ifndef _WIN32
#include <sys/prctl.h>
#endif

#include "mqtt_client/mqtt_client.h"

#include <string>
#include <thread>

const static float TARGET_X = 0.0f;
const static float TARGET_Y = 0.0f;

void testThread(int id, mqtt_client client)
{
    size_t count = 0;
    while (1) {
        if (count >= id) {
            break;
        }

        std::string str = "{\"x\":" + std::to_string(TARGET_X) + ",\"y\":" + std::to_string(TARGET_Y) + "}";

        mqtt_client_pub(client, "/pibot/body/move", str.c_str(), str.length());
        usleep(100*1000);
    }
}

int pos_update_cb(mqtt_client client, const char* playload, size_t len, int qos, void* user_data)
{
    printf("pos_update_cb, playload: %.*s, qos: %d/n", len, playload, qos);

    return 0;
}

int main()
{
    mqtt_client client = mqtt_client_create(NULL, "pibot");
    if (client == NULL) {
        return 0;
    }

    mqtt_client_sub_list_push(client, "/pibot/pos", 2, pos_update_cb, NULL);

    if (mqtt_client_sub(client) != 0) {
        printf("err\n");
    }

    std::thread t1 = std::thread(testThread, 10000, client);
    std::thread t2 = std::thread(testThread, 20000, client);
    t1.join();
    t2.join();
    
    mqtt_client_destroy(client);

    return 0;
}

相关文章

网友评论

      本文标题:使用paho client c封装一个mqtt

      本文链接:https://www.haomeiwen.com/subject/pnynpktx.html