python使用MQTT给硬件传输图片

作者: 烟雨丿丶蓝 | 来源:发表于2019-05-05 19:44 被阅读4次

    任务简述

    最近因需要用python写一个微服务来用MQTT给硬件传输图片,其中python用的是flask框架,大概流程如下:

    image

    协议为:

    需要将图片数据封装成多个消息进行传输,每个消息传输的数据字节数为1400Byte。

    消息(MQTT Payload) 格式:Web服务器-------->BASE:

    image

    反馈:BASE---------> Web服务器:

    image

    如果Web服务器发送完一个“数据传输消息”后,5S内没有收到MQTT“反馈消息”或者收到的反馈中显示“数据包不完整”,则重发该“数据传输消息”。

    程序流程图

    根据上面的协议,可以得到如下的流程图:

    image

    推荐一个不错的Python编程群:556370268,里面都是爱好Python编程的小伙伴,可以在一起学习Python最新知识,一起提升技能,有问题也能一起解决,不管是刚学Python还是有一定的Python基础的小伙伴,这都是个不错的选择哦。

    代码如下:

    # encoding:utf-8
    from flask import Flask, jsonify
    from flask_restful import Api, Resource, reqparse
    from PIL import Image
    from io import BytesIO
    import requests
    import os, logging, time
    import paho.mqtt.client as mqtt
    import struct
    from flask_cors import *
    # 日志配置信息
    logging.basicConfig(
     level=logging.INFO,
     format='%(asctime)s - %(name)s - %(levelname)s - %(message)s (runing by %(funcName)s',
    )
    class Mqtt(object):
     def __init__(self, img_data, size):
     self.MQTTHOST = '*******'
     self.MQTTPORT = "******"
     # 订阅和发送的主题
     self.topic_from_base = 'mqttTestSub'
     self.topic_to_base = 'mqttTestPub'
     self.client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
     self.client = mqtt.Client(self.client_id)
     # 完成链接后的回掉函数
     self.client.on_connect = self.on_connect
     # 图片大小
     self.size = size
     # 用于跳出死循环,结束任务
     self.finished = None
     # 包的编号
     self.index = 0
     # 将收到的图片数据按大小分成列表
     self.image_data_list = [img_data[x:x + 1400] for x in range(0, self.size, 1400)]
     # 记录发布后的数据,用于监控时延
     self.pub_time = 0
     self.header_to_base = 0xffffeeee
     self.header_from_base = 0xeeeeffff
     # 功能标识
     self.function_begin = 0x01
     self.function_doing = 0x02
     self.function_finished = 0x03
     # 包的完整和非完整状态
     self.whole_package = 0x01
     self.bad_package = 0x00
     # 头信息的格式,小端模式
     self.format_to_base = "<Lbhh"
     self.format_from_base = "<Lbhb"
     # 如果重发包时,用于检查是否重发第一个包
     self.first = True
     # 如果重发包时,用于检查是否重发最后一个包
     self.last = False
     self.begin_data = 'image.jpg;' + str(self.size)
     # 链接mqtt服务器函数
     def on_mqtt_connect(self):
     self.client.connect(self.MQTTHOST, self.MQTTPORT, 60)
     self.client.loop_start()
     # 链接完成后的回调函数
     def on_connect(self, client, userdata, flags, rc):
     logging.info("+++ Connected with result code {} +++".format(str(rc)))
     self.client.subscribe(self.topic_from_base)
     # 订阅函数
     def subscribe(self):
     self.client.subscribe(self.topic_from_base, 1)
     # 消息到来处理函数
     self.client.on_message = self.on_message
     # 接收到信息后的回调函数
     def on_message(self, client, userdata, msg):
     # 如果接受第一个包则不需要重发第一个
     self.first = False
     # 将接受到的包进行解压,得到一个元组
     base_tuple = struct.unpack(self.format_from_base, msg.payload)
     logging.info("+++ imageData's letgth is {}, base_tupe is {} +++".format(self.size, base_tuple))
     logging.info("+++ package_number is {}, package_status_from_base is {} +++"
     .format(base_tuple[2], base_tuple[3]))
     # 检查接受到信息的头部是否正确
     if base_tuple[0] == self.header_from_base:
     logging.info("+++ function_from_base is {} +++".format(base_tuple[1]))
     # 是否完成传输,如果完成则退出
     if base_tuple[1] == self.function_finished:
     logging.info("+++ finish work +++")
     self.finished = 1
     self.client.disconnect()
     else:
     # 是否是最后一个包
     if self.index == len(self.image_data_list) - 1:
     self.publish('finished', self.function_finished)
     self.last = True
     logging.info("+++ finished_data_to_base is finished+++")
     else:
     # 如果接收到的包不是 0x03则进行传送数据
     if base_tuple[1] == self.function_begin or base_tuple[1] == self.function_doing:
     logging.info("+++ package_number is {}, package_status_from_base is {} +++"
     .format(base_tuple[2],base_tuple[3]))
     # 如果数据的反馈中,包的状态是1则继续发下一个包
     if base_tuple[3] == self.whole_package:
     self.publish(self.index, self.function_doing)
     logging.info("+++ data_to_base is finished+++")
     self.index += 1
     # 如果数据的反馈中,包的状态是0则重发数据包
     elif base_tuple[3] == self.bad_package:
     re_package_number = base_tuple[2]
     self.publish(re_package_number-1, self.function_doing)
     logging.info("+++ re_data_to_base is finished+++")
     else:
     logging.info("+++ package_status_from_base is not 0 or 1 +++")
     self.client.disconnect()
     else:
     logging.info("+++ function_identifier is illegal +++")
     self.client.disconnect()
     else:
     logging.info("+++ header_from_base is illegal +++")
     self.client.disconnect()
     # 数据发送函数
     def publish(self, index, fuc):
     # 看是否是最后一个包
     if index == 'finished':
     length = 0
     package_number = 0
     data = b''
     else:
     length = len(self.image_data_list[index])
     package_number = index
     data = self.image_data_list[index]
     # 打包数据头信息
     buffer = struct.pack(
     self.format_to_base,
     self.header_to_base,
     fuc,
     package_number,
     length
     )
     to_base_data = buffer + data
     # mqtt发送
     self.client.publish(
     self.topic_to_base,
     to_base_data
     )
     self.pub_time = time.time()
     # 发送第一个包函数
     def publish_begin(self):
     buffer = struct.pack(
     self.format_to_base,
     self.header_to_base,
     self.function_begin,
     0,
     len(self.begin_data.encode('utf-8')),
     )
     begin_data = buffer + self.begin_data.encode('utf-8')
     self.client.publish(self.topic_to_base, begin_data)
     # 控制函数
     def control(self):
     self.on_mqtt_connect()
     self.publish_begin()
     begin_time = time.time()
     self.pub_time = time.time()
     self.subscribe()
     while True:
     time.sleep(1)
     # 超过5秒重传
     date = time.time() - self.pub_time
     if date > 5:
     # 是否重传第一个包
     if self.first == True:
     self.publish_begin()
     logging.info('+++ this is timeout first_data +++')
     # 是否重传最后一个包
     elif self.last == True:
     self.publish('finished', self.function_finished)
     logging.info('+++ this is timeout last_data +++')
     else:
     self.publish(self.index-1, self.function_doing)
     logging.info('+++ this is timeout middle_data +++')
     if self.finished == 1:
     logging.info('+++ all works is finished+++')
     break
     print(str(time.time()-begin_time) + 'begin_time - end_time')
    app = Flask(__name__)
    api = Api(app)
    CORS(app, supports_credentials=True)
    # 接受参数
    parser = reqparse.RequestParser()
    parser.add_argument('url', help='mqttImage url', location='args', type=str)
    class GetImage(Resource):
     # 得到参数并从图床下载到本地
     def get(self):
     args = parser.parse_args()
     url = args.get('url')
     response = requests.get(url)
     # 获取图片
     image = Image.open(BytesIO(response.content))
     # 存取图片
     add = os.path.join(os.path.abspath(''), 'image.jpg')
     image.save(add)
     # 得到图片大小
     size = os.path.getsize(add)
     f = open(add, 'rb')
     imageData = f.read()
     f.close()
     # 进行mqtt传输
     mqtt = Mqtt(imageData, size)
     mqtt.control()
     # 删除文件
     os.remove(add)
     logging.info('*** the result of control is {} ***'.format(1))
     return jsonify({
     "imageData": 1
     })
    api.add_resource(GetImage, '/image')
    if __name__ == '__main__':
     app.run(debug=True, host='0.0.0.0')
    

    相关文章

      网友评论

        本文标题:python使用MQTT给硬件传输图片

        本文链接:https://www.haomeiwen.com/subject/lyqkoqtx.html