烧录
进入固件官网:https://micropython.org/download/esp8266/
开关灯
from machine import Pin
import time
LED = Pin(2, Pin.OUT)
while True:
LED.value(1)
time.sleep(1)
LED.value(0)
time.sleep(1)
wifi联网
from machine import WiFi
import time
import usocket as socket
import network
def wifi_connect(wlan):
# if not wlan.isconnected():
# 打印正在连接信息
print('connecting to network...')
wifiname = 'hy1' # 替换SSID
password = '13363641456' # 替换password
print("正在链接WIFI:",wifiname,",pass:",password)
# 要连接的WiFi名,密码
wlan.connect(wifiname, password) # connect to an AP
# 死循环,直到连接成功 #可以尝试添加时间超限
while not wlan.isconnected():
# 打印连接信息
print("正在重试链接WIFI:",wifiname)
# 休息3s再继续打印
time.sleep(1)
pass
# 打印网络配置信息,包括IP地址等
print('network config:', wlan.ifconfig())
time.sleep(1)
return wlan.isconnected()
#server.start()
def wifi_init():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
all_wifi_info = wlan.scan()
# time.sleep(1)
for wifi_info in all_wifi_info:#wifi_name 是个元组类型数据 index 0 是 WiFi名字 数据类型都是bytes
if wifi_info[0] != b'':#舍弃空名字
print(wifi_info[0].decode("utf-8"))#bytes 转 str 解码
print('------')
print('发现的可用WiFi数量{}'.format(len(all_wifi_info)))
return wlan
使用方法
wlan = wifi_init()
result = wifi_connect(wlan)
设置wifi链接超时时间
from machine import WiFi
import time
wifi = WiFi()
# 设置连接超时时间为10秒
wifi.config(sta_timeout=10)
while True:
# 等待WiFi连接成功或超时
connected = wifi.wait_for_connection(timeout=10)
if connected:
print("Connected to network.")
else:
print("Connection timed out.")
time.sleep(5) # 每5秒检查一次连接状态
安装依赖库
联网后控制台可以使用 micropython 相关语法了如下:
控制台可以直接使用函数等功能
>>> upip.install('umqtt.simple')
>>> import ujson # JSON库
>>> import ure # 正则表达式库
# 导入 urequests 库和 machine 库
>>>import urequests
web服务 WebService.py
import socket, time, re
#import socket
class WebService:
def __init__(self,SERVER,PORT,):
self.SERVER = SERVER
self.PORT = PORT
self.CLIENT = None
print("启动web服务器2")
def run(self):
webserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #创建套接字
webserver.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #设置给定套接字选项的值
#webserver.settimeout(2000)
#ip="192.168.4.1" #AP's IP
# a=1
webserver.bind((self.SERVER, self.PORT)) #绑定IP地址和端口号
webserver.listen(5) #监听套接字
print("web:",self.SERVER,":",self.PORT)
# print(self.PORT)
while True:
conn, addr = webserver.accept() #接受一个连接,conn是一个新的socket对象
request = conn.recv(1024) #从套接字接收1024字节的数据
# 打印客户端发送的数据
print('Content = %s' % str(request))
if len(request)>0:
request = request.decode()
result = re.search("(.*?) (.*?) HTTP/1.1", request)
if result:
method = result.group(1)
url = result.group(2)
print(url)
if method == "POST":#获得post数据
postdata = re.search(".*?\r\n\r\n(.*)", request).group(1)
if postdata:
lists = postdata.split("&")
payload = {}#将post数据存数组
for list in lists:
k,v = list.split("=")
payload[k]=v
#print(payload)
#conn.sendall("HTTP/1.1 200 OK\nConnection: close\nServer: Esp8266\nContent-Type: text/html;charset=UTF-8\n\n")
conn.send("HTTP/1.1 200 OK\r\n")
conn.send("Server: Esp8266\r\n")
conn.send("Content-Type: text/html;charset=UTF-8\r\n")
conn.send("Connection: close\r\n")
conn.send("\r\n")
if url =="/":
conn.sendall(html)
elif url == "/12.php":
conn.sendall(payload["post"])
conn.send("\r\n") # 发送结束
else:
print("not found url")
else:
print("no request")
conn.close()
print("out %s" % str(addr))
使用
from src.WebService import WebService
web = WebService("0.0.0.0",80)
web.run()
mqqt服务
依赖umqtt.simple
安装依赖,thonny连接终端,先配置好wifi联网后,在8266系统内终端执行如下命令,或者代码里加一行代码安装
upip.install('umqtt.simple')
from umqtt.simple import MQTTClient
class MQTTService:
def __init__(self,CLIENT_ID,SERVER,PORT,):
self.CLIENT_ID = CLIENT_ID
self.SERVER = SERVER
self.PORT = PORT
self.CLIENT = None
def user_connect(self):
self.CLIENT = MQTTClient(self.CLIENT_ID, self.SERVER, self.PORT)
self.CLIENT.set_callback(self.sub_cb) # 设置回调函数
print('尝试连接"{server}"'.format(server = self.CLIENT_ID))
# self.CLIENT.set_callback(sub_cb)
try:
self.CLIENT.connect()
#print('Connected to MQTT Broker "{server}"'.format(server = self.SERVER))
except:
print('失败尝试重新连接')
self.user_connect()
#return self.client
def sub_cb(self,topic, msg): # 回调函数,收到服务器消息后会调用这个函数
print('{} {} '.format(topic.decode("utf-8"), msg.decode("utf-8")))
def subscribe(self, topic): # 修正变量名,使用方法参数而不是实例属性
self.CLIENT.subscribe(topic) # 订阅传入的topic
def check_msg(self):
self.CLIENT.check_msg()
def disconnect(self):
if self.CLIENT:
self.CLIENT.disconnect()
print('已从MQTT代理断开连接')
使用
from src.MQTTService import MQTTService
mqtt = MQTTService("message","127.0.0.1",5535)
mqtt.user_connect()
http请求库urequests
发送网络请求, 它会阻塞返回网络的响应数据,参数:
method 建立网络请求的方法,例如 HEAD,GET,POST,PUT,PATCH, DELETE。
url 网络请求的URL(网址)。
data(可选)在请求正文中发送的字典或元组列表[(键,值)](将是表单编码的),字节或类文件对象。
json(可选)在请求正文中发送的json数据。
headers(可选)要与请求一起发送的HTTP标头字典。
urequests.request(method, url, data=None, json=None, headers={})
更多方法
import urequests
# 发送一个 HEAD 请求 **kw request可选的参数,他就是request函数url参数后面的那些
urequests.head(url, **kw)
##get
urequests.get(url, **kw)
# post
urequests.post(url, **kw)
urequests.put(url, **kw)
urequests.patch(url, **kw)
urequests.delete(url, **kw)
返回响应的内容,以字节为单位。
urequests.content
以文本方式返回响应的内容,编码为unicode。
urequests.text
返回响应的json编码内容并转为dict类型。
urequests.json()
示例
文档https://mpython.readthedocs.io/zh/master/library/mPython/urequests.html
import urequests
url = "https://api.example.com/data" # 替换为您要请求的URL
# 发送GET请求
response = urequests.get(url)
# 检查响应状态码
if response.status_code == 200:
# 获取响应内容
content = response.text
print("Response:", content)
else:
print("Error:", response.status_code)
# 关闭响应连接
response.close()
定时器+urequests完成http请求温度传感器
requests 库来向 ThingSpeak 平台发送温度数据
# 导入 urequests 库和 machine 库
import urequests
import machine
# 创建一个 ADC 对象,连接到 G3 引脚(WiPy 上的温度传感器)
adc = machine.ADC()
apin = adc.channel(pin='G3')
# 创建一个定时器对象
tim = machine.Timer.Alarm()
# 定义一个发送数据的函数,作为定时器的回调函数
def send_data(tim):
# 读取温度传感器的电压值,转换为摄氏度
volts = apin.voltage()
celsius = (volts - 500) / 10
# 打印温度值
print('Temperature: {} C'.format(celsius))
# 使用自己的 ThingSpeak API 密钥和温度值构造一个 URL
url = 'https://api.thingspeak.com/update?api_key=YourAPIKey&field1={}'.format(celsius)
# 向 URL 发送一个 GET 请求,获取响应对象
response = urequests.get(url)
# 打印响应状态码和内容
print('Status: {}'.format(response.status_code))
print('Content: {}'.format(response.text))
# 关闭响应对象
response.close()
# 启动定时器,每隔 10 秒钟调用一次发送数据的函数
tim.callback(send_data, ms=10000)
tcp服务接受与发送
from network import WLAN
import socket
# 初始化WLAN模块
wlan = WLAN(WLAN.STA)
# 连接到WiFi网络
wlan.connect("MyWiFi", "mypassword")
# 等待连接成功
while not wlan.isconnected():
pass
# 创建TCP套接字并绑定到本地IP地址和端口号
server_address = ('localhost', 12345)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(server_address)
# 监听连接请求
sock.listen(1)
print("等待连接...")
# 接受客户端连接请求并发送数据到服务器
while True:
client, addr = sock.accept()
print("连接来自:", addr)
data = "Hello, client!"
client.send(data.encode())
client.close()
使用HTTP库访问互联网上的资源
import usocket
import ubinascii
import ure # 正则表达式库
import ujson # JSON库
from network import WLAN
# 初始化WLAN模块并连接到WiFi网络
wlan = WLAN(WLAN.STA)
wlan.connect("MyWiFi", "mypassword")
while not wlan.isconnected():
pass
# 创建TCP套接字并连接到目标服务器
server_address = ('www.example.com', 80)
sock = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM)
sock.connect(server_address)
# 发送HTTP请求并接收响应数据
request = b'GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n'
sock.send(request)
response = sock.recv(1024)
print("响应数据:", response)
sock.close()
使用 Blynk 库来控制 WiPy 上的 LED 灯的案例
Blynk 是一个用于物联网项目的平台,可以通过手机应用程序远程控制 WiPy 上的硬件。代码如下:
# 导入 Blynk 库
from blynk import Blynk
# 创建一个 Blynk 实例,使用自己的授权码
b = Blynk('YourAuthToken')
# 定义一个回调函数,当虚拟引脚 V0 收到数据时触发
@b.VIRTUAL_WRITE(0)
def my_write_handler(value):
# value 是一个字符串列表,例如 ['1']
# 将字符串转换为整数
value = int(value[0])
# 如果值为 1,则点亮 WiPy 上的蓝色 LED
if value == 1:
pycom.rgbled(0x0000ff)
# 如果值为 0,则关闭 LED
elif value == 0:
pycom.rgbled(0x000000)
# 启动 Blynk
b.run()
web服务端基础代码
import socket, time, re
html="""
<html>
<head>
<META HTTP-EQUIV="Content-Type" CONTENT="text/html">
<title>LED控制</title>
</head>
<body>
<form method="post" action="12.php">
<span id="status" name="status">%s</span>
<input type="button" value="转换" onclick="onSubmit()">
<input type="text" value="转换" name=post>
<input type="submit" value="sub" >
</from>
</body>
</html>
<script>
function onSubmit(){
if (window.XMLHttpRequest) {
// 用于现代浏览器的代码,code for IE7+, Firefox, Chrome, Opera, Safari
xmlhttp = new XMLHttpRequest();
} else {
// 应对老版本 IE 浏览器的代码,// code for IE6, IE5
xmlhttp = new ActiveXObject("Microsoft.XMLHTTP");
}
xmlhttp.onreadystatechange = function() {
if (xmlhttp.readyState == 4 && xmlhttp.status == 200) {
document.getElementById("status").innerHTML = xmlhttp.responseText;
// document.getElementById("status").value = xmlhttp.responseText;
}
}
xmlhttp.open("GET", "led", true);
xmlhttp.send("ssss");
}
</script>
"""
a=1
ip = "127.0.0.1"
port = 80
webserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #创建套接字
webserver.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #设置给定套接字选项的值
#webserver.settimeout(2000)
#ip="192.168.4.1" #AP's IP
webserver.bind((ip, port)) #绑定IP地址和端口号
webserver.listen(5) #监听套接字
print("服务器地址:%s:%d" %(ip,port))
while True:
conn, addr = webserver.accept() #接受一个连接,conn是一个新的socket对象
request = conn.recv(1024) #从套接字接收1024字节的数据
if len(request)>0:
request = request.decode()
result = re.search("(.*?) (.*?) HTTP/1.1", request)
if result:
method = result.group(1)
url = result.group(2)
print(url)
if method == "POST":#获得post数据
postdata = re.search(".*?\r\n\r\n(.*)", request).group(1)
if postdata:
lists = postdata.split("&")
payload = {}#将post数据存数组
for list in lists:
k,v = list.split("=")
payload[k]=v
#print(payload)
#conn.sendall("HTTP/1.1 200 OK\nConnection: close\nServer: Esp8266\nContent-Type: text/html;charset=UTF-8\n\n")
conn.send("HTTP/1.1 200 OK\r\n")
conn.send("Server: Esp8266\r\n")
conn.send("Content-Type: text/html;charset=UTF-8\r\n")
conn.send("Connection: close\r\n")
conn.send("\r\n")
if url =="/":
conn.sendall(html)
elif url == "/12.php":
conn.sendall(payload["post"])
conn.send("\r\n") # 发送结束
else:
print("not found url")
else:
print("no request")
conn.close()
print("out %s" % str(addr))
网友评论