美文网首页
esp8266-micropython常用类库WiFi,web,

esp8266-micropython常用类库WiFi,web,

作者: yichen_china | 来源:发表于2024-04-20 14:17 被阅读0次

    烧录

    进入固件官网:https://micropython.org/download/esp8266/

    开关灯

    from machine import Pin
    import time
    LED = Pin(2, Pin.OUT)
    while True:
        LED.value(1)
        time.sleep(1)
        LED.value(0)
        time.sleep(1)
    

    wifi联网

    from machine import WiFi
    import time
    import usocket as socket
    import network
    def wifi_connect(wlan):
    #     if not wlan.isconnected():
            # 打印正在连接信息
        print('connecting to network...')
        wifiname = 'hy1'  # 替换SSID
        password = '13363641456'  # 替换password
        print("正在链接WIFI:",wifiname,",pass:",password)
        # 要连接的WiFi名,密码
        wlan.connect(wifiname, password) # connect to an AP
        # 死循环,直到连接成功  #可以尝试添加时间超限
        while not wlan.isconnected():
            # 打印连接信息
            print("正在重试链接WIFI:",wifiname)
            # 休息3s再继续打印
            time.sleep(1)
            pass
        # 打印网络配置信息,包括IP地址等
        print('network config:', wlan.ifconfig())
        time.sleep(1)
        return wlan.isconnected()
        #server.start()
    def wifi_init():
        wlan = network.WLAN(network.STA_IF)
        wlan.active(True)
        all_wifi_info = wlan.scan()
    #     time.sleep(1)
        for wifi_info in all_wifi_info:#wifi_name 是个元组类型数据 index 0 是 WiFi名字  数据类型都是bytes
            if wifi_info[0] != b'':#舍弃空名字
                print(wifi_info[0].decode("utf-8"))#bytes 转 str  解码
        print('------')
        print('发现的可用WiFi数量{}'.format(len(all_wifi_info)))
        return wlan
    

    使用方法

        wlan = wifi_init()
        result = wifi_connect(wlan)
    

    设置wifi链接超时时间

    from machine import WiFi
    import time
    
    wifi = WiFi()
    
    # 设置连接超时时间为10秒
    wifi.config(sta_timeout=10)
    
    while True:
        # 等待WiFi连接成功或超时
        connected = wifi.wait_for_connection(timeout=10)
        if connected:
            print("Connected to network.")
        else:
            print("Connection timed out.")
        time.sleep(5)  # 每5秒检查一次连接状态
    

    安装依赖库

    联网后控制台可以使用 micropython 相关语法了如下:
    控制台可以直接使用函数等功能

    
    >>> upip.install('umqtt.simple')
    >>> import ujson  # JSON库 
    >>> import ure  # 正则表达式库 
    # 导入 urequests 库和 machine 库
    >>>import urequests
    

    web服务 WebService.py

    import socket, time, re
    #import socket
    class WebService:
        def __init__(self,SERVER,PORT,):
            self.SERVER = SERVER
            self.PORT = PORT
            self.CLIENT = None
            print("启动web服务器2")
        def run(self):
            webserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM)    #创建套接字
            webserver.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  #设置给定套接字选项的值
            #webserver.settimeout(2000)
            #ip="192.168.4.1" #AP's IP
    #         a=1
            webserver.bind((self.SERVER, self.PORT))                                       #绑定IP地址和端口号
            webserver.listen(5)                                              #监听套接字
            print("web:",self.SERVER,":",self.PORT)
    #         print(self.PORT)
            while True:
                conn, addr = webserver.accept()                                #接受一个连接,conn是一个新的socket对象
                request = conn.recv(1024)                              #从套接字接收1024字节的数据
                 # 打印客户端发送的数据
                print('Content = %s' % str(request))
                if len(request)>0:
                    request = request.decode()
                    result = re.search("(.*?) (.*?) HTTP/1.1", request)
                    if result:
                        method = result.group(1)
                        url = result.group(2)
                        print(url)
                        if method == "POST":#获得post数据
                            postdata = re.search(".*?\r\n\r\n(.*)", request).group(1)
                            if postdata:
                                lists = postdata.split("&")
                                payload = {}#将post数据存数组
                                for list in lists:
                                    k,v = list.split("=")
                                    payload[k]=v
                          #print(payload)
    
                        #conn.sendall("HTTP/1.1 200 OK\nConnection: close\nServer: Esp8266\nContent-Type: text/html;charset=UTF-8\n\n")
                        conn.send("HTTP/1.1 200 OK\r\n")
                        conn.send("Server: Esp8266\r\n")
                        conn.send("Content-Type: text/html;charset=UTF-8\r\n")
                        conn.send("Connection: close\r\n")
                        conn.send("\r\n")
                        if url =="/":
                            conn.sendall(html)
                        elif url == "/12.php":
                            conn.sendall(payload["post"])
                            conn.send("\r\n")  # 发送结束
                        else:
                            print("not found url")
                        else:
                            print("no request")
                            conn.close()
                            print("out %s" % str(addr))
    
    

    使用

    from src.WebService import WebService
    web = WebService("0.0.0.0",80)
    web.run()
    

    mqqt服务

    依赖umqtt.simple
    安装依赖,thonny连接终端,先配置好wifi联网后,在8266系统内终端执行如下命令,或者代码里加一行代码安装

    upip.install('umqtt.simple')
    
    from umqtt.simple import MQTTClient
    class MQTTService:
        def __init__(self,CLIENT_ID,SERVER,PORT,):
            self.CLIENT_ID = CLIENT_ID
            self.SERVER = SERVER
            self.PORT = PORT
            self.CLIENT = None
            
        def user_connect(self):
            self.CLIENT = MQTTClient(self.CLIENT_ID, self.SERVER, self.PORT)
            self.CLIENT.set_callback(self.sub_cb)  # 设置回调函数
            print('尝试连接"{server}"'.format(server = self.CLIENT_ID))
            # self.CLIENT.set_callback(sub_cb)
            try:
                self.CLIENT.connect()
                #print('Connected to MQTT Broker "{server}"'.format(server = self.SERVER))
            except:
                print('失败尝试重新连接')
                self.user_connect()
                
            #return self.client
     
        def sub_cb(self,topic, msg): # 回调函数,收到服务器消息后会调用这个函数
            print('{}   {} '.format(topic.decode("utf-8"), msg.decode("utf-8")))
     
     
        def subscribe(self, topic):  # 修正变量名,使用方法参数而不是实例属性  
            self.CLIENT.subscribe(topic)  # 订阅传入的topic  
     
     
        def check_msg(self):  
            self.CLIENT.check_msg()  
     
        def disconnect(self):  
     
            if self.CLIENT:  
     
                self.CLIENT.disconnect()  
     
                print('已从MQTT代理断开连接')  
    
    

    使用

    from src.MQTTService import MQTTService
    mqtt = MQTTService("message","127.0.0.1",5535)
    mqtt.user_connect()
    

    http请求库urequests

    发送网络请求, 它会阻塞返回网络的响应数据,参数:

    method 建立网络请求的方法,例如 HEAD,GET,POST,PUT,PATCH, DELETE。
    url 网络请求的URL(网址)。
    data(可选)在请求正文中发送的字典或元组列表[(键,值)](将是表单编码的),字节或类文件对象。
    json(可选)在请求正文中发送的json数据。
    headers(可选)要与请求一起发送的HTTP标头字典。

    urequests.request(method, url, data=None, json=None, headers={})
    

    更多方法

    import urequests
    # 发送一个 HEAD 请求 **kw request可选的参数,他就是request函数url参数后面的那些
    urequests.head(url, **kw)
    ##get
    urequests.get(url, **kw)
    # post
    urequests.post(url, **kw)
    urequests.put(url, **kw)
    urequests.patch(url, **kw)
    urequests.delete(url, **kw)
    

    返回响应的内容,以字节为单位。
    urequests.content
    以文本方式返回响应的内容,编码为unicode。
    urequests.text
    返回响应的json编码内容并转为dict类型。
    urequests.json()
    示例
    文档https://mpython.readthedocs.io/zh/master/library/mPython/urequests.html

    import urequests
    
    url = "https://api.example.com/data"  # 替换为您要请求的URL
    
    # 发送GET请求
    response = urequests.get(url)
    
    # 检查响应状态码
    if response.status_code == 200:
        # 获取响应内容
        content = response.text
        print("Response:", content)
    else:
        print("Error:", response.status_code)
    
    # 关闭响应连接
    response.close()
    

    定时器+urequests完成http请求温度传感器

    requests 库来向 ThingSpeak 平台发送温度数据

    # 导入 urequests 库和 machine 库
    import urequests
    import machine
    
    # 创建一个 ADC 对象,连接到 G3 引脚(WiPy 上的温度传感器)
    adc = machine.ADC()
    apin = adc.channel(pin='G3')
    
    # 创建一个定时器对象
    tim = machine.Timer.Alarm()
    
    # 定义一个发送数据的函数,作为定时器的回调函数
    def send_data(tim):
        # 读取温度传感器的电压值,转换为摄氏度
        volts = apin.voltage()
        celsius = (volts - 500) / 10
        # 打印温度值
        print('Temperature: {} C'.format(celsius))
        # 使用自己的 ThingSpeak API 密钥和温度值构造一个 URL
        url = 'https://api.thingspeak.com/update?api_key=YourAPIKey&field1={}'.format(celsius)
        # 向 URL 发送一个 GET 请求,获取响应对象
        response = urequests.get(url)
        # 打印响应状态码和内容
        print('Status: {}'.format(response.status_code))
        print('Content: {}'.format(response.text))
        # 关闭响应对象
        response.close()
    
    # 启动定时器,每隔 10 秒钟调用一次发送数据的函数
    tim.callback(send_data, ms=10000)
    

    tcp服务接受与发送

    from network import WLAN  
    import socket  
      
    # 初始化WLAN模块  
    wlan = WLAN(WLAN.STA)  
      
    # 连接到WiFi网络  
    wlan.connect("MyWiFi", "mypassword")  
      
    # 等待连接成功  
    while not wlan.isconnected():  
        pass  
      
    # 创建TCP套接字并绑定到本地IP地址和端口号  
    server_address = ('localhost', 12345)  
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
    sock.bind(server_address)  
      
    # 监听连接请求  
    sock.listen(1)  
    print("等待连接...")  
      
    # 接受客户端连接请求并发送数据到服务器  
    while True:  
        client, addr = sock.accept()  
        print("连接来自:", addr)  
        data = "Hello, client!"  
        client.send(data.encode())  
        client.close()
    

    使用HTTP库访问互联网上的资源

    import usocket  
    import ubinascii  
    import ure  # 正则表达式库  
    import ujson  # JSON库  
    from network import WLAN  
      
    # 初始化WLAN模块并连接到WiFi网络  
    wlan = WLAN(WLAN.STA)  
    wlan.connect("MyWiFi", "mypassword")  
    while not wlan.isconnected():  
        pass  
      
    # 创建TCP套接字并连接到目标服务器  
    server_address = ('www.example.com', 80)  
    sock = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM)  
    sock.connect(server_address)  
      
    # 发送HTTP请求并接收响应数据  
    request = b'GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n'  
    sock.send(request)  
    response = sock.recv(1024)  
    print("响应数据:", response)  
    sock.close()
    

    使用 Blynk 库来控制 WiPy 上的 LED 灯的案例

    Blynk 是一个用于物联网项目的平台,可以通过手机应用程序远程控制 WiPy 上的硬件。代码如下:

    # 导入 Blynk 库
    from blynk import Blynk
    
    # 创建一个 Blynk 实例,使用自己的授权码
    b = Blynk('YourAuthToken')
    
    # 定义一个回调函数,当虚拟引脚 V0 收到数据时触发
    @b.VIRTUAL_WRITE(0)
    def my_write_handler(value):
        # value 是一个字符串列表,例如 ['1']
        # 将字符串转换为整数
        value = int(value[0])
        # 如果值为 1,则点亮 WiPy 上的蓝色 LED
        if value == 1:
            pycom.rgbled(0x0000ff)
        # 如果值为 0,则关闭 LED
        elif value == 0:
            pycom.rgbled(0x000000)
    
    # 启动 Blynk
    b.run()
    

    web服务端基础代码

    import socket, time, re
    html="""
        <html>
        <head>
          <META HTTP-EQUIV="Content-Type" CONTENT="text/html">
          <title>LED控制</title>
        </head>
        <body>
        <form method="post" action="12.php">
            <span id="status" name="status">%s</span>
            <input type="button" value="转换" onclick="onSubmit()">
          <input type="text" value="转换" name=post>
          <input type="submit" value="sub" >
        </from>
        </body>
        </html>
        <script>
        function onSubmit(){
        if (window.XMLHttpRequest) {
            // 用于现代浏览器的代码,code for IE7+, Firefox, Chrome, Opera, Safari
             xmlhttp = new XMLHttpRequest();
         } else {
            // 应对老版本 IE 浏览器的代码,// code for IE6, IE5
             xmlhttp = new ActiveXObject("Microsoft.XMLHTTP");
         }
        
        xmlhttp.onreadystatechange = function() {
          if (xmlhttp.readyState == 4 && xmlhttp.status == 200) {
            document.getElementById("status").innerHTML = xmlhttp.responseText;
            // document.getElementById("status").value = xmlhttp.responseText;
          }
        }
        xmlhttp.open("GET", "led", true);
        xmlhttp.send("ssss");
        }
        </script>
    """
    a=1
    ip = "127.0.0.1"
    port = 80
    webserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM)    #创建套接字
    webserver.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  #设置给定套接字选项的值
    #webserver.settimeout(2000)
    #ip="192.168.4.1" #AP's IP
    webserver.bind((ip, port))                                       #绑定IP地址和端口号
    webserver.listen(5)                                              #监听套接字
    print("服务器地址:%s:%d" %(ip,port))
     
    while True:
      conn, addr = webserver.accept()                                #接受一个连接,conn是一个新的socket对象
      request = conn.recv(1024)                              #从套接字接收1024字节的数据
     
     
      if len(request)>0:
        request = request.decode()
        result = re.search("(.*?) (.*?) HTTP/1.1", request)
        if result:
          method = result.group(1)
          url = result.group(2)
          print(url)
          if method == "POST":#获得post数据
            postdata = re.search(".*?\r\n\r\n(.*)", request).group(1)
            if postdata:
              lists = postdata.split("&")
              payload = {}#将post数据存数组
              for list in lists:
                k,v = list.split("=")
                payload[k]=v
              #print(payload)
           
          #conn.sendall("HTTP/1.1 200 OK\nConnection: close\nServer: Esp8266\nContent-Type: text/html;charset=UTF-8\n\n")
          conn.send("HTTP/1.1 200 OK\r\n")
          conn.send("Server: Esp8266\r\n")
          conn.send("Content-Type: text/html;charset=UTF-8\r\n")
          conn.send("Connection: close\r\n")
          conn.send("\r\n")
          if url =="/":
            conn.sendall(html)
          elif url == "/12.php":
            conn.sendall(payload["post"])
          conn.send("\r\n")  # 发送结束
        else:
          print("not found url")
      else:
        print("no request")
      conn.close()
      print("out %s" % str(addr))
    
    

    相关文章

      网友评论

          本文标题:esp8266-micropython常用类库WiFi,web,

          本文链接:https://www.haomeiwen.com/subject/djlpxjtx.html