美文网首页
esp8266-micropython常用类库WiFi,web,

esp8266-micropython常用类库WiFi,web,

作者: yichen_china | 来源:发表于2024-04-20 14:17 被阅读0次

烧录

进入固件官网:https://micropython.org/download/esp8266/

开关灯

from machine import Pin
import time
LED = Pin(2, Pin.OUT)
while True:
    LED.value(1)
    time.sleep(1)
    LED.value(0)
    time.sleep(1)

wifi联网

from machine import WiFi
import time
import usocket as socket
import network
def wifi_connect(wlan):
#     if not wlan.isconnected():
        # 打印正在连接信息
    print('connecting to network...')
    wifiname = 'hy1'  # 替换SSID
    password = '13363641456'  # 替换password
    print("正在链接WIFI:",wifiname,",pass:",password)
    # 要连接的WiFi名,密码
    wlan.connect(wifiname, password) # connect to an AP
    # 死循环,直到连接成功  #可以尝试添加时间超限
    while not wlan.isconnected():
        # 打印连接信息
        print("正在重试链接WIFI:",wifiname)
        # 休息3s再继续打印
        time.sleep(1)
        pass
    # 打印网络配置信息,包括IP地址等
    print('network config:', wlan.ifconfig())
    time.sleep(1)
    return wlan.isconnected()
    #server.start()
def wifi_init():
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    all_wifi_info = wlan.scan()
#     time.sleep(1)
    for wifi_info in all_wifi_info:#wifi_name 是个元组类型数据 index 0 是 WiFi名字  数据类型都是bytes
        if wifi_info[0] != b'':#舍弃空名字
            print(wifi_info[0].decode("utf-8"))#bytes 转 str  解码
    print('------')
    print('发现的可用WiFi数量{}'.format(len(all_wifi_info)))
    return wlan

使用方法

    wlan = wifi_init()
    result = wifi_connect(wlan)

设置wifi链接超时时间

from machine import WiFi
import time

wifi = WiFi()

# 设置连接超时时间为10秒
wifi.config(sta_timeout=10)

while True:
    # 等待WiFi连接成功或超时
    connected = wifi.wait_for_connection(timeout=10)
    if connected:
        print("Connected to network.")
    else:
        print("Connection timed out.")
    time.sleep(5)  # 每5秒检查一次连接状态

安装依赖库

联网后控制台可以使用 micropython 相关语法了如下:
控制台可以直接使用函数等功能


>>> upip.install('umqtt.simple')
>>> import ujson  # JSON库 
>>> import ure  # 正则表达式库 
# 导入 urequests 库和 machine 库
>>>import urequests

web服务 WebService.py

import socket, time, re
#import socket
class WebService:
    def __init__(self,SERVER,PORT,):
        self.SERVER = SERVER
        self.PORT = PORT
        self.CLIENT = None
        print("启动web服务器2")
    def run(self):
        webserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM)    #创建套接字
        webserver.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  #设置给定套接字选项的值
        #webserver.settimeout(2000)
        #ip="192.168.4.1" #AP's IP
#         a=1
        webserver.bind((self.SERVER, self.PORT))                                       #绑定IP地址和端口号
        webserver.listen(5)                                              #监听套接字
        print("web:",self.SERVER,":",self.PORT)
#         print(self.PORT)
        while True:
            conn, addr = webserver.accept()                                #接受一个连接,conn是一个新的socket对象
            request = conn.recv(1024)                              #从套接字接收1024字节的数据
             # 打印客户端发送的数据
            print('Content = %s' % str(request))
            if len(request)>0:
                request = request.decode()
                result = re.search("(.*?) (.*?) HTTP/1.1", request)
                if result:
                    method = result.group(1)
                    url = result.group(2)
                    print(url)
                    if method == "POST":#获得post数据
                        postdata = re.search(".*?\r\n\r\n(.*)", request).group(1)
                        if postdata:
                            lists = postdata.split("&")
                            payload = {}#将post数据存数组
                            for list in lists:
                                k,v = list.split("=")
                                payload[k]=v
                      #print(payload)

                    #conn.sendall("HTTP/1.1 200 OK\nConnection: close\nServer: Esp8266\nContent-Type: text/html;charset=UTF-8\n\n")
                    conn.send("HTTP/1.1 200 OK\r\n")
                    conn.send("Server: Esp8266\r\n")
                    conn.send("Content-Type: text/html;charset=UTF-8\r\n")
                    conn.send("Connection: close\r\n")
                    conn.send("\r\n")
                    if url =="/":
                        conn.sendall(html)
                    elif url == "/12.php":
                        conn.sendall(payload["post"])
                        conn.send("\r\n")  # 发送结束
                    else:
                        print("not found url")
                    else:
                        print("no request")
                        conn.close()
                        print("out %s" % str(addr))

使用

from src.WebService import WebService
web = WebService("0.0.0.0",80)
web.run()

mqqt服务

依赖umqtt.simple
安装依赖,thonny连接终端,先配置好wifi联网后,在8266系统内终端执行如下命令,或者代码里加一行代码安装

upip.install('umqtt.simple')
from umqtt.simple import MQTTClient
class MQTTService:
    def __init__(self,CLIENT_ID,SERVER,PORT,):
        self.CLIENT_ID = CLIENT_ID
        self.SERVER = SERVER
        self.PORT = PORT
        self.CLIENT = None
        
    def user_connect(self):
        self.CLIENT = MQTTClient(self.CLIENT_ID, self.SERVER, self.PORT)
        self.CLIENT.set_callback(self.sub_cb)  # 设置回调函数
        print('尝试连接"{server}"'.format(server = self.CLIENT_ID))
        # self.CLIENT.set_callback(sub_cb)
        try:
            self.CLIENT.connect()
            #print('Connected to MQTT Broker "{server}"'.format(server = self.SERVER))
        except:
            print('失败尝试重新连接')
            self.user_connect()
            
        #return self.client
 
    def sub_cb(self,topic, msg): # 回调函数,收到服务器消息后会调用这个函数
        print('{}   {} '.format(topic.decode("utf-8"), msg.decode("utf-8")))
 
 
    def subscribe(self, topic):  # 修正变量名,使用方法参数而不是实例属性  
        self.CLIENT.subscribe(topic)  # 订阅传入的topic  
 
 
    def check_msg(self):  
        self.CLIENT.check_msg()  
 
    def disconnect(self):  
 
        if self.CLIENT:  
 
            self.CLIENT.disconnect()  
 
            print('已从MQTT代理断开连接')  

使用

from src.MQTTService import MQTTService
mqtt = MQTTService("message","127.0.0.1",5535)
mqtt.user_connect()

http请求库urequests

发送网络请求, 它会阻塞返回网络的响应数据,参数:

method 建立网络请求的方法,例如 HEAD,GET,POST,PUT,PATCH, DELETE。
url 网络请求的URL(网址)。
data(可选)在请求正文中发送的字典或元组列表[(键,值)](将是表单编码的),字节或类文件对象。
json(可选)在请求正文中发送的json数据。
headers(可选)要与请求一起发送的HTTP标头字典。

urequests.request(method, url, data=None, json=None, headers={})

更多方法

import urequests
# 发送一个 HEAD 请求 **kw request可选的参数,他就是request函数url参数后面的那些
urequests.head(url, **kw)
##get
urequests.get(url, **kw)
# post
urequests.post(url, **kw)
urequests.put(url, **kw)
urequests.patch(url, **kw)
urequests.delete(url, **kw)

返回响应的内容,以字节为单位。
urequests.content
以文本方式返回响应的内容,编码为unicode。
urequests.text
返回响应的json编码内容并转为dict类型。
urequests.json()
示例
文档https://mpython.readthedocs.io/zh/master/library/mPython/urequests.html

import urequests

url = "https://api.example.com/data"  # 替换为您要请求的URL

# 发送GET请求
response = urequests.get(url)

# 检查响应状态码
if response.status_code == 200:
    # 获取响应内容
    content = response.text
    print("Response:", content)
else:
    print("Error:", response.status_code)

# 关闭响应连接
response.close()

定时器+urequests完成http请求温度传感器

requests 库来向 ThingSpeak 平台发送温度数据

# 导入 urequests 库和 machine 库
import urequests
import machine

# 创建一个 ADC 对象,连接到 G3 引脚(WiPy 上的温度传感器)
adc = machine.ADC()
apin = adc.channel(pin='G3')

# 创建一个定时器对象
tim = machine.Timer.Alarm()

# 定义一个发送数据的函数,作为定时器的回调函数
def send_data(tim):
    # 读取温度传感器的电压值,转换为摄氏度
    volts = apin.voltage()
    celsius = (volts - 500) / 10
    # 打印温度值
    print('Temperature: {} C'.format(celsius))
    # 使用自己的 ThingSpeak API 密钥和温度值构造一个 URL
    url = 'https://api.thingspeak.com/update?api_key=YourAPIKey&field1={}'.format(celsius)
    # 向 URL 发送一个 GET 请求,获取响应对象
    response = urequests.get(url)
    # 打印响应状态码和内容
    print('Status: {}'.format(response.status_code))
    print('Content: {}'.format(response.text))
    # 关闭响应对象
    response.close()

# 启动定时器,每隔 10 秒钟调用一次发送数据的函数
tim.callback(send_data, ms=10000)

tcp服务接受与发送

from network import WLAN  
import socket  
  
# 初始化WLAN模块  
wlan = WLAN(WLAN.STA)  
  
# 连接到WiFi网络  
wlan.connect("MyWiFi", "mypassword")  
  
# 等待连接成功  
while not wlan.isconnected():  
    pass  
  
# 创建TCP套接字并绑定到本地IP地址和端口号  
server_address = ('localhost', 12345)  
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
sock.bind(server_address)  
  
# 监听连接请求  
sock.listen(1)  
print("等待连接...")  
  
# 接受客户端连接请求并发送数据到服务器  
while True:  
    client, addr = sock.accept()  
    print("连接来自:", addr)  
    data = "Hello, client!"  
    client.send(data.encode())  
    client.close()

使用HTTP库访问互联网上的资源

import usocket  
import ubinascii  
import ure  # 正则表达式库  
import ujson  # JSON库  
from network import WLAN  
  
# 初始化WLAN模块并连接到WiFi网络  
wlan = WLAN(WLAN.STA)  
wlan.connect("MyWiFi", "mypassword")  
while not wlan.isconnected():  
    pass  
  
# 创建TCP套接字并连接到目标服务器  
server_address = ('www.example.com', 80)  
sock = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM)  
sock.connect(server_address)  
  
# 发送HTTP请求并接收响应数据  
request = b'GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n'  
sock.send(request)  
response = sock.recv(1024)  
print("响应数据:", response)  
sock.close()

使用 Blynk 库来控制 WiPy 上的 LED 灯的案例

Blynk 是一个用于物联网项目的平台,可以通过手机应用程序远程控制 WiPy 上的硬件。代码如下:

# 导入 Blynk 库
from blynk import Blynk

# 创建一个 Blynk 实例,使用自己的授权码
b = Blynk('YourAuthToken')

# 定义一个回调函数,当虚拟引脚 V0 收到数据时触发
@b.VIRTUAL_WRITE(0)
def my_write_handler(value):
    # value 是一个字符串列表,例如 ['1']
    # 将字符串转换为整数
    value = int(value[0])
    # 如果值为 1,则点亮 WiPy 上的蓝色 LED
    if value == 1:
        pycom.rgbled(0x0000ff)
    # 如果值为 0,则关闭 LED
    elif value == 0:
        pycom.rgbled(0x000000)

# 启动 Blynk
b.run()

web服务端基础代码

import socket, time, re
html="""
    <html>
    <head>
      <META HTTP-EQUIV="Content-Type" CONTENT="text/html">
      <title>LED控制</title>
    </head>
    <body>
    <form method="post" action="12.php">
        <span id="status" name="status">%s</span>
        <input type="button" value="转换" onclick="onSubmit()">
      <input type="text" value="转换" name=post>
      <input type="submit" value="sub" >
    </from>
    </body>
    </html>
    <script>
    function onSubmit(){
    if (window.XMLHttpRequest) {
        // 用于现代浏览器的代码,code for IE7+, Firefox, Chrome, Opera, Safari
         xmlhttp = new XMLHttpRequest();
     } else {
        // 应对老版本 IE 浏览器的代码,// code for IE6, IE5
         xmlhttp = new ActiveXObject("Microsoft.XMLHTTP");
     }
    
    xmlhttp.onreadystatechange = function() {
      if (xmlhttp.readyState == 4 && xmlhttp.status == 200) {
        document.getElementById("status").innerHTML = xmlhttp.responseText;
        // document.getElementById("status").value = xmlhttp.responseText;
      }
    }
    xmlhttp.open("GET", "led", true);
    xmlhttp.send("ssss");
    }
    </script>
"""
a=1
ip = "127.0.0.1"
port = 80
webserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM)    #创建套接字
webserver.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  #设置给定套接字选项的值
#webserver.settimeout(2000)
#ip="192.168.4.1" #AP's IP
webserver.bind((ip, port))                                       #绑定IP地址和端口号
webserver.listen(5)                                              #监听套接字
print("服务器地址:%s:%d" %(ip,port))
 
while True:
  conn, addr = webserver.accept()                                #接受一个连接,conn是一个新的socket对象
  request = conn.recv(1024)                              #从套接字接收1024字节的数据
 
 
  if len(request)>0:
    request = request.decode()
    result = re.search("(.*?) (.*?) HTTP/1.1", request)
    if result:
      method = result.group(1)
      url = result.group(2)
      print(url)
      if method == "POST":#获得post数据
        postdata = re.search(".*?\r\n\r\n(.*)", request).group(1)
        if postdata:
          lists = postdata.split("&")
          payload = {}#将post数据存数组
          for list in lists:
            k,v = list.split("=")
            payload[k]=v
          #print(payload)
       
      #conn.sendall("HTTP/1.1 200 OK\nConnection: close\nServer: Esp8266\nContent-Type: text/html;charset=UTF-8\n\n")
      conn.send("HTTP/1.1 200 OK\r\n")
      conn.send("Server: Esp8266\r\n")
      conn.send("Content-Type: text/html;charset=UTF-8\r\n")
      conn.send("Connection: close\r\n")
      conn.send("\r\n")
      if url =="/":
        conn.sendall(html)
      elif url == "/12.php":
        conn.sendall(payload["post"])
      conn.send("\r\n")  # 发送结束
    else:
      print("not found url")
  else:
    print("no request")
  conn.close()
  print("out %s" % str(addr))

相关文章

网友评论

      本文标题:esp8266-micropython常用类库WiFi,web,

      本文链接:https://www.haomeiwen.com/subject/djlpxjtx.html