"""
ESP32 + MicroPython + MQTT 连接 OneNET 平台
功能:控制LED的开关状态(Bool类型)+ DHT22温湿度传感器(int32类型)
在Wokwi仿真平台中测试通过
"""
import network
import time
from machine import Pin
import dht
import ujson
from umqtt.simple import MQTTClient
# ==================== OneNET配置信息 ====================
# 产品ID
PRODUCT_ID = "8FQ1zo61Yf"
# 设备名称/ID
DEVICE_NAME = "breath"
AUTH_TOKEN = "version=2018-10-31&res=products%2F8FQ1zo61Yf%2Fdevices%2Fbreath&et=1793850960&method=md5&sign=l%2BztMig8zILn%2Fj3t7H3t6Q%3D%3D"
MQTT_SERVER = "mqtts.heclouds.com"
MQTT_PORT = 1883
# MQTT主题定义
# 属性上报主题(OneJSON格式)
TOPIC_PROPERTY_POST = f"$sys/{PRODUCT_ID}/{DEVICE_NAME}/thing/property/post"
# 属性设置主题(接收云平台下发的命令)
TOPIC_PROPERTY_SET = f"$sys/{PRODUCT_ID}/{DEVICE_NAME}/thing/property/set"
# 属性设置响应主题
TOPIC_PROPERTY_SET_REPLY = f"$sys/{PRODUCT_ID}/{DEVICE_NAME}/thing/property/set_reply"
# 属性上报响应主题(用于调试,查看上报是否成功)
TOPIC_PROPERTY_POST_REPLY = f"$sys/{PRODUCT_ID}/{DEVICE_NAME}/thing/property/post/reply"
# LED引脚配置
LED_PIN = 2 # ESP32内置LED通常在GPIO2
led = Pin(LED_PIN, Pin.OUT)
# DHT22传感器配置
DHT22_PIN = 14 # DHT22数据引脚连接到GPIO15
dht_sensor = dht.DHT22(Pin(DHT22_PIN))
# ==================== WiFi连接函数 ====================
def connect_wifi():
"""连接到Wokwi虚拟WiFi"""
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
print("正在连接WiFi...", end="")
# Wokwi平台的虚拟WiFi
sta_if.connect('Wokwi-GUEST', '')
# 等待连接成功
for _ in range(50): # 最多等待5秒
if sta_if.isconnected():
print("\nWiFi连接成功!")
print("IP地址:", sta_if.ifconfig()[0])
return True
print(".", end="")
time.sleep(0.1)
print("\nWiFi连接超时!")
return False
# ==================== MQTT回调函数 ====================
def mqtt_callback(topic, msg):
"""处理接收到的MQTT消息"""
try:
print(f"\n收到消息:")
print(f" 主题: {topic.decode()}")
print(f" 内容: {msg.decode()}")
# 解析JSON消息
message = ujson.loads(msg.decode())
# 检查是否是属性设置命令
if "params" in message:
params = message["params"]
# 检查是否有LED控制命令
if "LED" in params:
led_value = params["LED"]["value"]
print(f"\nLED控制命令: {led_value}")
# 控制LED
if led_value == True or led_value == 1:
led.value(1)
print("LED已打开")
elif led_value == False or led_value == 0:
led.value(0)
print("LED已关闭")
# 发送响应给云平台
send_property_set_reply(message["id"], 200, "success")
else:
print("未知的属性设置命令")
# 处理属性上报响应
elif "code" in message and "msg" in message:
print(f"上报响应: code={message.get('code')}, msg={message.get('msg')}")
except Exception as e:
print(f"处理消息时出错: {e}")
# ==================== 发送属性设置响应 ====================
def send_property_set_reply(msg_id, code, msg):
"""向云平台发送属性设置响应"""
try:
response = {
"id": str(msg_id), # 确保id是字符串类型
"code": code,
"msg": msg
}
client.publish(TOPIC_PROPERTY_SET_REPLY, ujson.dumps(response))
print(f"已发送响应: {response}")
except Exception as e:
print(f"发送响应时出错: {e}")
# ==================== 上报LED状态 ====================
def report_led_status():
"""向云平台上报LED当前状态"""
try:
# 获取LED引脚的原始值(0或1)
pin_value = led.value()
# 转换为布尔值(True/False)
# Python中:bool(1)=True, bool(0)=False
current_state = bool(pin_value)
# 构造OneJSON格式的消息
# 重要:bool类型属性不允许包含time字段!
message = {
"id": str(int(time.time())), # 使用秒级时间戳作为消息ID(字符串类型,不超过13位)
"version": "1.0",
"params": {
"LED": {
"value": current_state # 布尔值:True或False,不包含time字段
}
}
}
# 发布到属性上报主题
client.publish(TOPIC_PROPERTY_POST, ujson.dumps(message))
print(f"已上报LED状态: {'开' if current_state else '关'}")
print(f"上报的JSON: {ujson.dumps(message)}")
except Exception as e:
print(f"上报状态时出错: {e}")
# ==================== 读取DHT22温湿度传感器 ====================
def read_dht22():
"""读取DHT22温湿度传感器数据"""
try:
# 测量传感器数据
dht_sensor.measure()
# 获取温度(摄氏度)
temperature = dht_sensor.temperature()
# 获取湿度(百分比)
humidity = dht_sensor.humidity()
# 转换为int32类型(OneNET要求的类型)
temp_int32 = int(round(temperature))
humi_int32 = int(round(humidity))
print(f"读取到温度: {temperature}°C (转换为int32: {temp_int32})")
print(f"读取到湿度: {humidity}% (转换为int32: {humi_int32})")
return temp_int32, humi_int32
except OSError as e:
print(f"读取DHT22传感器时出错: {e}")
return None, None
except Exception as e:
print(f"处理传感器数据时出错: {e}")
return None, None
# ==================== 上报温湿度数据 ====================
def report_env_data():
"""向云平台上报温湿度数据"""
try:
# 读取DHT22传感器数据
temperature, humidity = read_dht22()
if temperature is not None and humidity is not None:
# 构造OneJSON格式的消息
message = {
"id": str(int(time.time())), # 使用秒级时间戳作为消息ID(字符串类型,不超过13位)
"version": "1.0",
"params": {
"temp_APP": {
"value": temperature, # int32类型的温度值
},
"humi_APP": {
"value": humidity, # int32类型的湿度值
}
}
}
# 发布到属性上报主题
client.publish(TOPIC_PROPERTY_POST, ujson.dumps(message))
print(f"已上报温湿度数据: 温度={temperature}°C, 湿度={humidity}%")
print(f"上报的JSON: {ujson.dumps(message)}")
except Exception as e:
print(f"上报温湿度数据时出错: {e}")
# ==================== 主程序 ====================
# 1. 连接WiFi
if not connect_wifi():
print("无法连接WiFi,程序终止")
while True:
pass
# 2. 初始化MQTT客户端
print("\n正在连接MQTT服务器...")
try:
client = MQTTClient(
client_id=DEVICE_NAME,
server=MQTT_SERVER,
port=MQTT_PORT,
user=PRODUCT_ID,
password=AUTH_TOKEN,
keepalive=60
)
# 设置回调函数
client.set_callback(mqtt_callback)
# 连接到MQTT服务器
client.connect()
print("MQTT连接成功!")
# 订阅属性设置主题
client.subscribe(TOPIC_PROPERTY_SET)
print(f"已订阅主题: {TOPIC_PROPERTY_SET}")
# 订阅属性上报响应主题(用于调试)
client.subscribe(TOPIC_PROPERTY_POST_REPLY)
print(f"已订阅响应主题: {TOPIC_PROPERTY_POST_REPLY}")
# 3. 初始LED状态
led.value(0) # 初始关闭LED
print("LED初始状态: 关闭")
# 4. 上报初始状态
report_led_status()
print("\n设备已上线,等待云平台命令...")
# 5. 主循环
last_report_time = time.ticks_ms()
last_sensor_read_time = time.ticks_ms()
while True:
# 检查MQTT消息(非阻塞)
client.check_msg()
# 每30秒上报一次LED状态
if time.ticks_diff(time.ticks_ms(), last_report_time) > 30000:
report_led_status()
last_report_time = time.ticks_ms()
# 每3秒读取并上报一次DHT22传感器数据
# DHT22最小读取间隔为2秒,所以设置为3秒比较安全
if time.ticks_diff(time.ticks_ms(), last_sensor_read_time) > 3000:
report_env_data()
last_sensor_read_time = time.ticks_ms()
# 短暂延时,避免CPU占用过高
time.sleep(0.1)
except Exception as e:
print(f"MQTT连接或运行出错: {e}")
print("5秒后尝试重新连接...")
time.sleep(5)