import gc
import network
gc.collect()
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
wifiName = "CMCC-PebQ"
wifiPassword = "your wifi password"
print('connecting to wifi network...')
wlan.connect(wifiName, wifiPassword)
print('wifi connected .')
编辑完之后将其拖到 NodeMCU 设备中,打开 Mu 中的 REPL,设备会自动执行 boot.py 中的程序开始连接网络:
然后在终端里输入:
安装http请求依赖
>>> import upip
>>> upip.install('micropython-urequests')
>>> import urequests as uq
>>> res = uq.get("http://api.example.com")
>>> res.text
'{"code":0,"data":{"title":"poetry api server","version":"0.1.0","stage":"prod"},"msg":"ok"}'
>>> res.json()
{'code': 0, 'data': {'title': 'poetry api server', 'stage': 'prod', 'version': '0.1.0'}, 'msg': 'ok'}
发送 mqtt消息
import urequests
# MQTT 代理服务器地址
mqtt_broker_address = "http://your-mqtt-broker.com"
# 发送 MQTT 消息的函数
def send_mqtt_message(topic, message):
# 构造 HTTP 请求的 payload
payload = {
'topic': topic,
'message': message
}
# 发送 POST 请求到 MQTT 代理服务器
response = urequests.post(f'{mqtt_broker_address}/mqtt/publish', json=payload)
# 打印响应状态
print(f'Status: {response.status_code}')
# 关闭响应
response.close()
# 发送一条 MQTT 消息
send_mqtt_message('your/mqtt/topic', 'Hello, MQTT!')
安装web框架
import upip
upip.install('microdot')
安装mqqt
upip.install('umqtt.simple')
1.工具类MQTT服务
class MQTTService:
def __init__(self,CLIENT_ID,SERVER,PORT,):
self.CLIENT_ID = CLIENT_ID
self.SERVER = SERVER
self.PORT = PORT
self.CLIENT = None
def user_connect(self):
self.CLIENT = MQTTClient(self.CLIENT_ID, self.SERVER, self.PORT)
self.CLIENT.set_callback(self.sub_cb) # 设置回调函数
print('尝试连接')
# self.CLIENT.set_callback(sub_cb)
self.CLIENT.connect()
print('Connected to MQTT Broker "{server}"'.format(server = self.SERVER))
# return self.client
def sub_cb(self,topic, msg): # 回调函数,收到服务器消息后会调用这个函数
print('{} {} '.format(topic.decode("utf-8"), msg.decode("utf-8")))
def subscribe(self, topic): # 修正变量名,使用方法参数而不是实例属性
self.CLIENT.subscribe(topic) # 订阅传入的topic
def check_msg(self):
self.CLIENT.check_msg()
def disconnect(self):
if self.CLIENT:
self.CLIENT.disconnect()
print('已从MQTT代理断开连接')
2.工具类 链接wifi
def wifi_connect():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
all_wifi_info = wlan.scan()
time.sleep(2)
for wifi_info in all_wifi_info:#wifi_name 是个元组类型数据 index 0 是 WiFi名字 数据类型都是bytes
if wifi_info[0] != b'':#舍弃空名字
print(wifi_info[0].decode("utf-8"))#bytes 转 str 解码
print('------')
print('发现的可用WiFi数量{}'.format(len(all_wifi_info)))
if not wlan.isconnected():
# 打印正在连接信息
print('connecting to network...')
# 要连接的WiFi名,密码
wlan.connect('TP-LINK', '12345678') # connect to an AP
# 死循环,直到连接成功 #可以尝试添加时间超限
while not wlan.isconnected():
# 打印连接信息
print("正在链接...")
# 休息3s再继续打印
time.sleep(3)
# print(wlan.isconnected()) # check if the station is connected to an AP
print('network config:', wlan.ifconfig())
time.sleep(1)
网友评论