参考源:http://www.eclipse.org/paho/files/mqttdoc/MQTTAsync/html/index.html
原来有个项目用的是同步模式的mqtt机制(即在同一个线程里维护receive和send),但频繁发生断链的情况,加了重连机制后仍是没多少改善,因此研究了一下异步模式的mqtt。
主要内容:1.连接 2.订阅 3.监听消息 4.发布消息 5.断链重连
1.创建基础变量
① 创建MQTTAsync_connectOptions类型的变量conn_opt
② 创建MQTTAsync 类型的client变量
2.创建MQTTAsync_createOptions类型的变量
char struct_id [4] //必须是MQCO.
int struct_version //0或1,其中0标志no MQTTVersion,默认0
int sendWhileDisconnected //是否允许丢失连接时发送消息,默认0
int maxBufferedMessages //未连接时允许缓冲的最大消息数,默认100
int MQTTVersion //MQTT版本,有3.1,3.1.1,5,默认0
3.MQTTAsync_createWithOptions 将各种参数加入到client中
3.1内部检查option参数
3.2初始化socket模块,维护需要的套接字列表,其他参数置为空或0
clientsds 套接字
connect_pending 正在挂起连接的套接字
write_pending 待写的套接字
3.3设置socket回调函数等
3.4用上述信息,统一初始化client变量
client指针转换成MQTTAsyncs类型的指针再逐一赋值
4.MQTTAsync_setCallbacks 设置回调函数
为各种动作设置回调函数,对应动作不需要回调函数的就把入参设为NULL。
//handle和context其实是同一种类型,都是MQTTAsync,入参时无特殊情况直接将这两个参数穿client即可
MQTTAsync handle
void* context
//与服务器的连接丢失的情况下会调用它,可以在内部写重连机制,即每次丢失连接调用这个接口的时候,进行重连,不丢失则不会反复重连
MQTTAsync_connectionLost* cl
//接收到服务器消息的时候会调用它
MQTTAsync_messageArrived* ma
//客户端publish成功将会调用它
MQTTAsync_deliveryComplete* dc
5.为conn_opt赋值各种参数
这里需要注意,MQTTVersion使用版本不一样时,初始化conn_opt的宏以及需要赋值的参数内容也是有差异的。
默认用MQTTAsync_connectOptions_initializer初始化,使用5.0的时候将用MQTTAsync_connectOptions_initializer5进行初始化。其他的看下面的结构体内容解释。
typedef struct
{
char struct_id[4]; //MQTC
/** 这个参数必须是 0, 1, 2, 3 4 5 or 6.
* 0 表示不使用SSL,也不使用serverURIs
* 1 表示不使用serverURIs
* 2 表示没有设置版本信息
* 3 表示不使用自动重连
* 4 表示不使用二进制密码,即不设置binarypwd
* 5 表示没有MQTTV5的属性
默认设置这个值为6
*/
int struct_version;
//保活时间间隔
int keepAliveInterval;
//1 丢失连接时清楚所有这个连接的信息,包括订阅等,0与之相反,使用非5.0的时候设置这个参数
int cleansession;
int maxInflight; //这个值设置最大的在飞行中的消息数量
MQTTAsync_willOptions* will; //这个参数设置断开连接时的临终遗嘱消息,断开连接时发送消息到LWP topic
const char* username; //用户名
const char* password; //密码
int connectTimeout; //连接超时时间
int retryInterval; //重发未发送的消息的时间间隔,以秒为单位的内部时钟间隔
MQTTAsync_SSLOptions* ssl; //指向ssl设置参数的结构体指针,为NULL时表示不使用SSL
MQTTAsync_onSuccess* onSuccess; //连接成功时调用
MQTTAsync_onFailure* onFailure; //连接失败时调用
void* context; //赋值为client
int serverURIcount; //serverURIs数组大小
char* const* serverURIs; //服务器url数组,null结尾的字符串数组,地址格式为protocol://host:port。protocol可以是tcp或者ssl,host可以是ip地址或者域名
int MQTTVersion; //MQTT版本信息
int automaticReconnect; //自动重连
int minRetryInterval; //最小的重试间隔,以秒为单位,没失败一次,值增加一倍
int maxRetryInterval; //最大的重连时间间隔,最小重连时间间隔超过这个值时停止
struct {
int len; /**< binary password length */
const void* data; /**< binary password data */
} binarypwd; //二进制密码设置,目前没去了解
int cleanstart; //使用5.0的时候设置这个参数
//MQTTV5的一些属性设置
MQTTProperties *connectProperties;
MQTTProperties *willProperties;
MQTTAsync_onSuccess5* onSuccess5;
MQTTAsync_onFailure5* onFailure5;
} MQTTAsync_connectOptions;
6.建立连接
MQTTAsync_connect(clinet,&conn_opts),使用上述的client和conn_opts作为参数,调用connect接口进行连接。
需要注意的是,这个接口返回成功并不是说连接就成功了,只能说明这个接口调用成功,一些检查也成功了。
根据源码可以看到,连接成功后会调用onSuccess,也可以说,可以通过是否调用到了onSuccess接口,判断连接成功与否。
连接成功后可以做订阅发消息等操作。在项目中我在调用connect之后做了一个监听,MQTTAsync_isConnected返回值是1了表示连接成功,再做后继操作。
MQTTAsync_connect接口调用时,内部会生成两个线程,一个用于接收一个用于发送,发送和接收互不影响。断开连接时,如果设置了cleansession,线程停止,重连时重新创建。
7.订阅
一般在conn_opts的参数onSuccess里进行订阅。在上面说了,这个参数所设置的接口会在连接成功的时候调用,那么一些定死的主题在这里订阅即可。
使用到的订阅主题的接口:MQTTAsync_subscribe
这个接口的参数也可以设置成功调用的接口、失败调用的接口,可加可不加,这里就不主要讲了。接口最后一个入参,同样要区分v5和非V5。
V5:MQTTAsync_callOptions copts = MQTTAsync_callOptions_initializer;
非V5:MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer;
两者的context赋值client
8.监听消息
MQTTAsync_setCallbacks设置的MQTTAsync_messageArrived会实时监听消息,接收到消息后,解析MQTTAsync_message结构体指针所包含的内容,对消息的处理将在这里进行,包括解析命令、存储数据库等操作。
9.发布消息
调用到的接口:MQTTAsync_send
MQTTAsync handle //赋值为client
const char* destinationName //目标主题名称
int payloadlen //消息长度
const void* payload //消息内容
/*
QoS0,最多一次:消息最多传递一次,或者可能根本不会传递。它通过网络的传递是不被认可的。消息没有被存储。如果客户端断开连接,或者服务器出现故障,消息可能会丢失。QoS0是最快的传输方式。它有时被称为“火与遗忘”。
MQTT协议不要求服务器将QoS0上的发布转发到客户机。如果客户机在服务器接收发布时断开连接,则发布可能被丢弃,具体取决于服务器实现。
QoS1,至少一次:消息总是至少传递一次。如果在发送方接收到确认之前出现故障,则可能多次发送。消息必须存储在发送方的本地,直到发送方收到确认消息已由接收方发布的消息。消息被存储起来,以防必须再次发送消息。
QoS2,只发送一次:消息总是只发送一次。消息必须存储在发送方的本地,直到发送方收到确认消息已由接收方发布的消息。消息被存储起来,以防必须再次发送消息。QoS2是最安全但最慢的传输方式。与QoS1相比,QoS1使用了更复杂的握手和确认序列,以确保不会出现重复消息。
*/
int qos
int retained //消息的保留标志,传0即可
MQTTAsync_responseOptions* response //一个指向::MQTTAsync_responseOptions结构的指针。用于设置回调函数。若没有需要调用的回调函数,可以设置为NULL。
10.断链处理
上面MQTTAsync_setCallbacks设置的MQTTAsync_connectionLost在这时就起到作用了。断链时会调用到这个接口,在这个接口内部判断是否是连接状态,若不是,就再次调用MQTTAsync_connect接口进行重连。
网友评论