美文网首页
MQTT客户端代码分析

MQTT客户端代码分析

作者: 海伯利安Hyperion | 来源:发表于2017-02-16 18:44 被阅读0次

    MQTT客户端实现

    MQTT是个轻量级的消息订阅/发布协议,基于TCP协议,在物联网中应用较广,当然也有的公司拿MQTT协议来做Push或IM。MQTT协议有很多客户端/服务端的实现,如Eclipse Paho就是其中一个。本文不对MQTT协议本身做介绍,而是主要分析下一个Paho MQTT客户端的代码实现。

    消息基类

    所有消息的基类就是MqttWireMessage,核心的方法无非是封包/拆包,创建包头,读取playload等等。各个消息子类如MqttSubscribe等,继承自MqttWireMessage,需要实现getMessageInfo、getVariableHeader(构造包头),getPayload(构造body)等方法

    重要接口

    IMqttAsyncClient 声明了与MQTT Server交互时的重要方法,如connect、publish、subscribe等,这些方法是异步的,有两种调用方式

    //方式一 
           IMqttToken conToken;
            conToken = asyncClient.client.connect(conToken);
          ... do some work...
            conToken.waitForCompletion();
            
            //或者这样就可以把一个异步任务转化成同步调用
            IMqttToken token;
          token = asyncClient.method(parms).waitForCompletion();
     
    
    
    //方式二,传入一个callback MqttAsyncActionListener,实现onSuccess及onFailure方法
    
            IMqttToken conToken;
            conToken = asyncClient.connect("some context",new new MqttAsyncActionListener() {
                public void onSuccess(IMqttToken asyncActionToken) {
                    log("Connected");
                }
     
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    log ("connect failed" +exception);
                }
              });
    

    我们可以把此处的IMqttToken理解为对异步的任务信息、操作的封装,里面包含了getTopics、setActionCallback、waitForCompletion、isComplete、getMessageId、getResponse等方法

    MqttAsyncClient是对IMqttAsyncClient接口的具体实现,里面包含两个重要的类:ClientComms,用来和服务器交互的类,封装了底层的网络调用;MqttClientPersistence,按照协议的QoS规定用来做消息的持久化。

    public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)
                throws MqttException, MqttSecurityException {
            final String methodName = "connect";
            if (comms.isConnected()) {
                throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);
            }
            ...
    
            //设置网络模块,NetworkModule是一个接口,包含start、stop、getInputStream、getOutputStream四个方法,
            //有TCPNetworkModule、SSLNetworkModule、LocalNetworkModule等不同的实现
            comms.setNetworkModules(createNetworkModules(serverURI, options));
    
            // Insert our own callback to iterate through the URIs till the connect succeeds
            //ConnectActionListener实现了IMqttActionListener接口,并把失败重试等逻辑都封装在该类里面;userToken是返回给上层调用者使用的对异步任务操作的封装类
            MqttToken userToken = new MqttToken(getClientId());
            ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options, userToken, userContext, callback);
            userToken.setActionCallback(connectActionListener);
            userToken.setUserContext(this);
    
            comms.setNetworkModuleIndex(0);
            connectActionListener.connect();
    
            return userToken;
        }
    

    网络管理类

    ClientComms是网络层重要的管理类,包含几个主要的类:

    • NetworkModule 底层网络实现 CommsSender 和 CommsReceiver里的输入输出流来自于NetworkModule层的Socket
    • CommsReceiver 接收消息 起一个线程,通过MqttInputStream解析出消息
    • CommsSender 发送消息 起一个线程消费ClientState里的发送队列,通过MqttOutputStream往外写消息
    • ClientState 管理消息的发送,里面有Vector pendingMessages pendingFlows 待发送的消息,结合不同的Qos进行处理
    • CommsCallback 收到消息后的回调处理,是Receiver和外部API调用之间的桥梁

    ClientComms连接服务器时,会在一个异步线程ConnectBG中执行,包括启动网络模块,初始化CommsReceiver、CommsSender等

    // Connect to the server at the network level e.g. TCP socket and then
                    // start the background processing threads before sending the connect
                    // packet.
                    NetworkModule networkModule = networkModules[networkModuleIndex];
                    networkModule.start();
                    receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());
                    receiver.start("MQTT Rec: "+getClient().getClientId());
                    sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());
                    sender.start("MQTT Snd: "+getClient().getClientId());
                    callback.start("MQTT Call: "+getClient().getClientId());                
                    internalSend(conPacket, conToken);
                    
    

    通过internalSend方法,将要发送的消息插入到ClientState中的pendingMessages队列中,CommsSender会去消费这个队列,把消息取出来,写到MqttOutputStream里,并根据Qos设置做一些持久化操作。写成功后会有回调通知外部API调用。其他消息的发送也是这个流程。

    //CommsSender中的run方法
    while (running && (out != null)) {
                try {
                    message = clientState.get();
                    if (message != null) {
    
                        if (message instanceof MqttAck) {
                            out.write(message);
                            out.flush();
                        } else {
                            MqttToken token = tokenStore.getToken(message);
    
                            if (token != null) {
                                synchronized (token) {
                                    out.write(message);
                                    try {
                                        out.flush();
                                    } catch (IOException ex) {
                                        if (!(message instanceof MqttDisconnect)) {
                                            throw ex;
                                        }
                                    }
                                    clientState.notifySent(message);
                                }
                            }
                        }
                    } 
                } 
    
    

    CommsReceiver负责从InputStream读出消息,通过ClientState进行分发

    public void run() {
            final String methodName = "run";
            MqttToken token = null;
            
            while (running && (in != null)) {
                try {
                    //@TRACE 852=network read message
                    log.fine(CLASS_NAME,methodName,"852");
                    receiving = in.available() > 0;
                    MqttWireMessage message = in.readMqttWireMessage();
                    receiving = false;
                    
                    if (message instanceof MqttAck) {
                        token = tokenStore.getToken(message);
                        if (token!=null) {
                            synchronized (token) {
                                clientState.notifyReceivedAck((MqttAck)message);
                            }
                        } else {
                            throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
                        }
                    } else {
                        // A new message has arrived
                        clientState.notifyReceivedMsg(message);
                    }
                }
                catch (MqttException ex) {
                    running = false;
                    // Token maybe null but that is handled in shutdown
                    clientComms.shutdownConnection(token, ex);
                } 
                catch (IOException ioe) {
                    //@TRACE 853=Stopping due to IOException
                    log.fine(CLASS_NAME,methodName,"853");
    
                    running = false;
                    if (!clientComms.isDisconnecting()) {
                        clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe));
                    }
                }
                finally {
                    receiving = false;
                }
            }
    

    而在ClientState中,通过notifyReceivedMsg方法接收到消息,根据不同的Qos做持久化操作,并最终调用了CommsCallback的messageArrived方法,将消息加入到一个messageQueue队列中。

    CommsCallback里也起了一个线程消费这个队列,将取出的消息在handleMessage方法中通过MqttCallback接口回调出去。上文提到的发送完毕时,会将token插入到CommsCallback的completeQueue方法里进行消费,也是在这个run方法里

    //CommsCallback里的run方法
    while (running) {
                try {
    
                    if (running) {
                        // Check for deliveryComplete callbacks...
                        MqttToken token = null;
                        synchronized (completeQueue) {
                            if (!completeQueue.isEmpty()) {
                                // First call the delivery arrived callback if needed
                                token = (MqttToken) completeQueue.elementAt(0);
                                completeQueue.removeElementAt(0);
                            }
                        }
                        if (null != token) {
                            handleActionComplete(token);
                        }
                        
                        // Check for messageArrived callbacks...
                        MqttPublish message = null;
                        synchronized (messageQueue) {
                            if (!messageQueue.isEmpty()) {
                               
                                message = (MqttPublish) messageQueue.elementAt(0);
    
                                messageQueue.removeElementAt(0);
                            }
                        }
                        if (null != message) {
                            handleMessage(message);
                        }
                    }
    
                    
                    
                } 
    
    //处理发送完成的方法
    private void handleActionComplete(MqttToken token)
                throws MqttException {
            final String methodName = "handleActionComplete";
            synchronized (token) {
                
                // Unblock any waiters and if pending complete now set completed
                token.internalTok.notifyComplete();
                
                if (!token.internalTok.isNotified()) {
                    // If a callback is registered and delivery has finished 
                    // call delivery complete callback. 
                    if ( mqttCallback != null 
                        && token instanceof MqttDeliveryToken 
                        && token.isComplete()) {
                            mqttCallback.deliveryComplete((MqttDeliveryToken) token);
                    }
                    // Now call async action completion callbacks
                    fireActionEvent(token);
                }
                
                // Set notified so we don't tell the user again about this action.
                if ( token.isComplete() ){
                   if ( token instanceof MqttDeliveryToken || token.getActionCallback() instanceof IMqttActionListener ) {
                        token.internalTok.setNotified(true);
                    }
                }
                
    
                if (token.isComplete()) {
                    clientState.notifyComplete(token);
                }
            }
        }
    

    使用介绍

    以同步调用为例

        String tmpDir = System.getProperty("java.io.tmpdir");
            MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
    
            try {
                // Construct the connection options object that contains connection parameters
                // such as cleanSession and LWT
                conOpt = new MqttConnectOptions();
                conOpt.setCleanSession(clean);
                if(password != null ) {
                  conOpt.setPassword(this.password.toCharArray());
                }
                if(userName != null) {
                  conOpt.setUserName(this.userName);
                }
    
                // Construct an MQTT blocking mode client
                client = new MqttClient(this.brokerUrl,clientId, dataStore);
    
                // Set this wrapper as the callback handler
                client.setCallback(this);
    
            } catch (MqttException e) {
                e.printStackTrace();
                log("Unable to set up client: "+e.toString());
                System.exit(1);
            }
    

    设置各种连接参数,如用户名,密码,持久化存储路径等,并设置MqttCallback回调函数。

    public interface MqttCallback {
    //连接断开时的回调
    public void connectionLost(Throwable cause);
    //收到下推消息时的回调
    public void messageArrived(String topic, MqttMessage message) throws Exception;
    //消息发送成功时的回调
    public void deliveryComplete(IMqttDeliveryToken token);
    }
    

    想要发布一个消息时,可以这样

    public void publish(String topicName, int qos, byte[] payload) throws MqttException {
    
            client.connect(conOpt);
    
            String time = new Timestamp(System.currentTimeMillis()).toString();
    
            MqttMessage message = new MqttMessage(payload);
            message.setQos(qos);
    
            client.publish(topicName, message);
    
            client.disconnect();
        }
    
    

    这个publish方法是个同步方法,里面的实现其实是代理给异步client+wait阻塞实现的

    public void publish(String topic, MqttMessage message) throws MqttException,
                MqttPersistenceException {
            aClient.publish(topic, message, null, null).waitForCompletion(getTimeToWait());
        }
    

    publish调用后将消息插入到ClientState的队列中,通过CommsSender线程中发送给服务器,发送完成时(或收到ack后)会回调MqttCallback接口中的deliveryComplete方法。用户还可以设置IMqttActionListener接口获取发送是成功还是失败的回调。如果收到一个新的消息,最终通过MqttCallback中的messageArrived回调给用户。

    相关文章

      网友评论

          本文标题:MQTT客户端代码分析

          本文链接:https://www.haomeiwen.com/subject/xjphwttx.html