美文网首页
串口通信(转载)

串口通信(转载)

作者: 汲之郎 | 来源:发表于2022-11-01 14:45 被阅读0次

    导入pyserial库和定义全局变量

    import serial
    import serial.tools.list_ports
    import threading
    
    com_rx_buf = ''         # 接收缓冲区
    com_tx_buf = ''         # 发送缓冲区
    COMM = serial.Serial()      # 定义串口对象
    port_list: list             # 可用串口列表
    port_select: list           # 选择好的串口
    

    port_list = serial.tools.list_ports.comports() # 返回值为可用的串口列表,其len属性为可用串口的数量

    post_list.len(),为2
    post_list[0].device字段为串口编号,为COM99
    post_list[1].device字段为串口编号,为COM97
    串口的编号,在设备管理器中是可以设置的,我一般是将指定设备安排一个比较大的串口号,列表的最后一项就是我想要的设备了。

    打开串口
    COMM = serial.Serial(serial_port, 115200, timeout=0.01)
    if COMM.isOpen():
    print(serial_port, "open success")
    return 0
    else:
    print("open failed")
    return 255

    关闭串口
    对应的是关闭串口,程序结束时必须关闭串口,否则,如果程序异常退出后,再次运行程序时,串口会因为已经处于打开状态而出错。

    COMM.close()
    1
    发送数据
    buf = [0,3,55,32] # buf的格式
    COMM.write(buf)

    接收数据
    buf = COMM.readall() 可以读取出串口接收的所有数据。可以开一个接收线程,里面是一个死循环,每经过一定的延迟时间,执行一次本指令即可。
    需要注意的是,如果对方发送的是一长串数据,又恰好在数据发送了一部分的时候延时时间到,这样收到的数据就不完全,此时可以再经过一个延迟,再执行一次readall(),即可读取全部数据了。这个工作放在接收线程中完成即可。

    全部程序如下,多出了好多函数,文件是从现有工程中摘出来的,捡有用的看吧。

    安装:pip3 install pyserial //python3

    import serial
    import serial.tools.list_ports
    import time
    import threading

    com_rx_buf = '' # 接收缓冲区
    com_tx_buf = '' # 发送缓冲区
    COMM = serial.Serial() # 定义串口对象
    port_list: list # 可用串口列表
    port_select: list # 选择好的串口

    无串口返回0,

    返回可用的串口列表

    def get_com_list():
    global port_list
    # a = serial.tools.list_ports.comports()
    # print(a)
    # port_list = list(serial.tools.list_ports.comports())
    port_list = serial.tools.list_ports.comports()
    return port_list

    def set_com_port(n=0):
    global port_list
    global port_select
    port_select = port_list[n]
    return port_select.device

    打开串口

    def serial_open(n=0):
    global COMM
    serial_port = set_com_port(n)
    COMM = serial.Serial(serial_port, 115200, timeout=0.01)
    if COMM.isOpen():
    print(serial_port, "open success")
    return 0
    else:
    print("open failed")
    return 255

    关闭串口

    def serial_close():
    global COMM
    COMM.close()
    print(COMM.name + "closed.")

    def set_com_rx_buf(buf=''):
    global com_rx_buf
    com_rx_buf = buf

    def set_com_tx_buf(buf=''):
    global com_tx_buf
    com_tx_buf = buf

    def get_com_rx_buf():
    global com_rx_buf
    return com_rx_buf

    def get_com_tx_buf():
    global com_tx_buf
    return com_tx_buf

    def thread_com_receive():
    while True:
    try:
    rx_buf = ''
    rx_buf = COMM.read() # 转化为整型数字
    if rx_buf != b'':
    time.sleep(0.01)
    rx_buf = rx_buf + COMM.read_all()
    print("串口收到消息:", rx_buf)
    time.sleep(0.01)
    except:
    pass
    pass

    def serial_encode(addr=0, command=0, param1=0, param0=0):

    buf = [addr, command, param1, param0, 0, 0, 0, 0]

    print(buf)

    return buf

    def serial_send_command(addr=0, command=0, param1=0, param0=0, data3=0, data2=0, data1=0, data0=0):
    buf = [addr, command, param1, param0, data3, data2, data1, data0]
    COMM.write(buf)
    pass

    def serial_init():
    buf = "AT+CG\r\n"
    COMM.write(buf)
    time.sleep(0.05)
    buf = COMM.read_all()
    if buf != "OK\r\n":
    return 254 # 进入调试模式失败

    buf = "AT+CAN_MODE=0\r\n"
    COMM.write(buf)
    time.sleep(0.05)
    buf = COMM.read_all()
    if buf != "OK\r\n":
        return 253          # 进入正常模式失败,模块处于1状态,即环回模式中
    
    buf = "AT+CAN_BAUD=500000\r\n"
    COMM.write(buf)
    time.sleep(0.05)
    buf = COMM.read_all()
    if buf != "OK\r\n":
        return 253          # 波特率设置失败
    
    buf = "AT+FRAMEFORMAT=1,0,\r\n"
    COMM.write(buf)
    time.sleep(0.05)
    buf = COMM.read_all()
    if buf != "OK\r\n":
        return 253          # 波特率设置失败
    
    buf = "AT+ET\r\n"       # 进入透传模式
    COMM.write(buf)
    time.sleep(0.05)
    buf = COMM.read_all()
    if buf != "OK\r\n":
        return 255  # 不是CAN模块
    

    if name == 'main':
    get_com_list()
    len = port_list.len()
    device = port_list[0].device
    print(len, device)
    serial_open()
    thread1 = threading.Thread(target=thread_com_receive)
    thread1.start()

    buf = [33, 2, 0x55, 1, 4]
    COMM.write(buf)
    
    # serial_close()
    

    判断串口打开还是关闭
    import socket

    def is_open(ip, port):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
    s.connect((ip, port))
    s.shutdown(2)
    print("{} is used".format(port))
    except:
    print("{} is unused".format(port))

    if name == 'main':
    is_open('127.0.0.1', 22)

    相关文章

      网友评论

          本文标题:串口通信(转载)

          本文链接:https://www.haomeiwen.com/subject/cwqkzrtx.html