import dht
import machine
import time
import ssd1306
from umqtt.simple import MQTTClient
# 全局配置
DHT_PIN = 14 # DHT22数据引脚连接到GPIO14
I2C_SCL = 5 # I2C时钟线连接到GPIO5
I2C_SDA = 4 # I2C数据线连接到GPIO4
OLED_WIDTH = 128 # OLED宽度
OLED_HEIGHT = 64 # OLED高度
# 需要修改的地方
wifiName = "Wokwi-GUEST" # wifi名称,不支持5G wifi
wifiPassword = "" # wifi密码
clientID = "d2bc31d11dbd41a08255a2d5a60a1dee" # Client ID,密钥,巴法云控制台获取
tempTopic = "mytemp" # 温度主题,巴法MQTT控制台创建
humiTopic = "humi" # 湿度主题,巴法MQTT控制台创建
# 默认设置
serverIP = "bemfa.com" # mqtt服务器地址
port = 9501
# WIFI连接函数
def do_connect():
import network
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
print('connecting to network...')
sta_if.active(True)
sta_if.connect(wifiName, wifiPassword)
max_retries = 10
retry_count = 0
while not sta_if.isconnected() and retry_count < max_retries:
time.sleep(1)
retry_count += 1
if not sta_if.isconnected():
print("Failed to connect to WiFi after multiple retries.")
raise OSError("WiFi connection failed")
print('connect WiFi ok')
# 初始化mqtt连接配置
def connect_and_subscribe():
max_retries = 5
retry_count = 0
while retry_count < max_retries:
try:
client = MQTTClient(clientID, serverIP, port)
client.connect()
print("Connected to %s" % serverIP)
return client
except OSError as e:
print(f"MQTT connection error (attempt {retry_count + 1}): {e}")
retry_count += 1
time.sleep(2)
print("Failed to connect to MQTT broker after multiple retries.")
raise OSError("MQTT connection failed")
def init_display():
"""初始化OLED显示屏"""
i2c = machine.I2C(scl=machine.Pin(I2C_SCL), sda=machine.Pin(I2C_SDA))
display = ssd1306.SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c)
return display
def read_sensor_data():
"""读取DHT22传感器数据"""
sensor = dht.DHT22(machine.Pin(DHT_PIN))
try:
sensor.measure()
temperature = sensor.temperature()
humidity = sensor.humidity()
return temperature, humidity
except OSError as e:
print(f"传感器读取错误: {e}")
return None, None
def display_data(temperature, humidity, display):
"""格式化并显示温湿度数据到终端和OLED屏幕"""
if temperature is not None and humidity is not None:
# 显示到终端
print(f"温度: {temperature}°C | 湿度: {humidity}%")
# 显示到OLED屏幕
display.fill(0) # 清屏
display.text("Temp & Humidity", 0, 0)
display.text(f"Temp: {temperature} C", 0, 20)
display.text(f"Humi: {humidity} %", 0, 40)
display.show() # 更新显示
else:
print("无法读取传感器数据")
display.fill(0)
display.text("Sensor Error", 0, 0)
display.show()
def publish_to_mqtt(client, temperature, humidity):
"""发布温湿度数据到MQTT主题"""
if temperature is not None and humidity is not None:
try:
# 发布温度数据到mytemp主题
client.publish(tempTopic, str(temperature))
# 发布湿度数据到humi主题
client.publish(humiTopic, str(humidity))
print("Data published to MQTT Broker!")
except Exception as e:
print(f"Error publishing data to MQTT: {e}")
raise # 重新抛出异常以便上层处理
def main():
"""主函数,程序入口点"""
print("DHT22温湿度监控系统启动...")
# 连接WiFi
try:
do_connect()
except OSError as e:
print(f"WiFi连接错误: {e}")
time.sleep(10)
machine.reset()
# 连接MQTT
try:
client = connect_and_subscribe()
except OSError as e:
print(f"MQTT连接错误: {e}")
time.sleep(10)
machine.reset()
# 初始化显示屏
display = init_display()
display.text("System Starting...", 0, 0)
display.show()
time.sleep(2)
try:
while True:
# 读取传感器数据
temperature, humidity = read_sensor_data()
# 显示数据
display_data(temperature, humidity, display)
# 发布数据到MQTT
try:
publish_to_mqtt(client, temperature, humidity)
except Exception as e:
print(f"MQTT发布失败,尝试重连: {e}")
client = connect_and_subscribe()
# 重连成功后再次尝试发布
publish_to_mqtt(client, temperature, humidity)
# 等待2秒(DHT22最小采样间隔)
time.sleep(2)
except KeyboardInterrupt:
print("\n程序已停止")
display.fill(0)
display.text("Program Stopped", 0, 0)
display.show()
except Exception as e:
print(f"发生错误: {e}")
display.fill(0)
display.text("System Error", 0, 0)
display.show()
finally:
# 断开MQTT连接
try:
client.disconnect()
print("Disconnected from MQTT Broker")
except Exception:
pass
# 程序入口
if __name__ == "__main__":
main()