import dht
import machine
import time
from machine import Pin, I2C
import ssd1306
import network
from umqtt.simple import MQTTClient
# 全局配置
DHT_PIN = 14 # DHT22数据引脚连接到GPIO14
# SSD1306 OLED配置
OLED_WIDTH = 128
OLED_HEIGHT = 64
OLED_SDA = 4 # GPIO4
OLED_SCL = 5 # GPIO5
# Wokwi WiFi配置
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# 巴法云MQTT配置
MQTT_CLIENT_ID = "f8d91192b8cf47b79d6cf0bc28cba822"
MQTT_BROKER = "bemfa.com"
MQTT_PORT = 9501
MQTT_USER = "[email protected]"
MQTT_PASSWORD = "956351911"
TOPIC_TEMP = "temp"
TOPIC_HUM = "hum"
# 初始化I2C和OLED
i2c = I2C(0, sda=Pin(OLED_SDA), scl=Pin(OLED_SCL), freq=400000)
oled = ssd1306.SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c)
# 初始化WiFi
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
def connect_wifi():
"""连接WiFi网络"""
if not wifi.isconnected():
print("正在连接WiFi...")
oled.fill(0)
oled.text("Connecting WiFi", 0, 20)
oled.show()
wifi.connect(WIFI_SSID, WIFI_PASSWORD)
max_attempts = 20 # 增加尝试次数
while not wifi.isconnected() and max_attempts > 0:
max_attempts -= 1
print(f"等待WiFi连接...剩余尝试 {max_attempts}次")
time.sleep(1)
if wifi.isconnected():
print("WiFi连接成功:", wifi.ifconfig())
oled.fill(0)
oled.text("WiFi Connected", 0, 20)
oled.text(wifi.ifconfig()[0], 0, 35)
oled.show()
time.sleep(1)
else:
print("WiFi连接失败")
oled.fill(0)
oled.text("WiFi Failed", 0, 20)
oled.show()
raise RuntimeError("无法连接WiFi")
# 初始化MQTT客户端
mqtt_client = MQTTClient(
client_id=MQTT_CLIENT_ID,
server=MQTT_BROKER,
port=MQTT_PORT,
user=MQTT_USER,
password=MQTT_PASSWORD,
keepalive=30 # 缩短心跳间隔
)
def mqtt_callback(topic, msg):
"""MQTT消息回调函数"""
print(f"收到消息: {topic.decode()} -> {msg.decode()}")
# 可以在OLED上显示收到的消息
oled.fill(0)
oled.text("MQTT Msg:", 0, 0)
oled.text(f"{topic.decode()}:{msg.decode()}", 0, 20)
oled.show()
time.sleep(2)
def connect_mqtt():
"""连接MQTT服务器并订阅主题"""
try:
mqtt_client.set_callback(mqtt_callback)
mqtt_client.connect()
print("MQTT连接成功")
# 订阅主题(使状态显示为在线)
mqtt_client.subscribe(TOPIC_TEMP)
mqtt_client.subscribe(TOPIC_HUM)
print(f"已订阅主题: {TOPIC_TEMP}, {TOPIC_HUM}")
oled.fill(0)
oled.text("MQTT Connected", 0, 20)
oled.text("Subscribed!", 0, 35)
oled.show()
time.sleep(1)
except Exception as e:
print(f"MQTT连接失败: {e}")
oled.fill(0)
oled.text("MQTT Failed", 0, 20)
oled.text(str(e)[:20], 0, 35) # 显示部分错误信息
oled.show()
raise
def read_sensor_data():
"""读取DHT22传感器数据"""
sensor = dht.DHT22(machine.Pin(DHT_PIN))
try:
sensor.measure()
temperature = sensor.temperature()
humidity = sensor.humidity()
return temperature, humidity
except OSError as e:
print(f"传感器读取错误: {e}")
return None, None
def publish_data(temperature, humidity):
"""通过MQTT发布数据到巴法云"""
try:
if temperature is not None:
mqtt_client.publish(TOPIC_TEMP, str(temperature))
print(f"已发布温度: {temperature}°C")
if humidity is not None:
mqtt_client.publish(TOPIC_HUM, str(humidity))
print(f"已发布湿度: {humidity}%")
except Exception as e:
print(f"MQTT发布失败: {e}")
# 尝试重新连接
connect_mqtt()
def display_data(temperature, humidity):
"""在OLED和串口上显示数据"""
oled.fill(0)
oled.text("Env Monitor", 0, 0)
for x in range(OLED_WIDTH):
oled.pixel(x, 12, 1)
if temperature is not None and humidity is not None:
oled.text(f"Temp: {temperature:.1f}C", 0, 20)
oled.text(f"Hum: {humidity:.1f}%", 0, 35)
if temperature > 30:
oled.text("HOT!", 80, 20)
elif temperature < 15:
oled.text("COLD!", 80, 20)
if humidity > 70:
oled.text("HUMID!", 80, 35)
print(f"温度: {temperature}°C | 湿度: {humidity}%")
publish_data(temperature, humidity)
else:
oled.text("Sensor Error!", 0, 20)
print("无法获取传感器数据")
# 显示连接状态
status = f"WiFi:{'On' if wifi.isconnected() else 'Off'}"
status += f" MQTT:{'On' if mqtt_client.sock else 'Off'}"
oled.text(status, 0, 50)
oled.show()
def main():
"""主函数,程序入口点"""
print("DHT22温度监控系统启动...")
# 初始连接
connect_wifi()
connect_mqtt()
try:
while True:
# 检查MQTT消息
try:
mqtt_client.check_msg() # 检查新消息
except Exception as e:
print(f"检查消息错误: {e}")
connect_mqtt() # 出错时重新连接
# 读取和显示数据
temperature, humidity = read_sensor_data()
display_data(temperature, humidity)
# 保持连接
try:
mqtt_client.ping()
except:
connect_mqtt()
time.sleep(2)
except KeyboardInterrupt:
print("\n程序已停止")
mqtt_client.disconnect()
except Exception as e:
print(f"发生错误:{e}")
mqtt_client.disconnect()
if __name__ == "__main__":
main()