import dht
import machine
import time
import ssd1306
from umqtt.simple import MQTTClient
# ------------------ 全局配置 ------------------ #
# DHT22 配置
DHT_PIN = 14 # DHT22 数据引脚(GPIO14)
# MQ2 烟雾传感器配置
MQ2_ADC_PIN = 25 # MQ2 模拟输出引脚(GPIO34,需根据硬件调整)
adc = machine.ADC(machine.Pin(MQ2_ADC_PIN)) # 初始化 ADC
adc.atten(machine.ADC.ATTN_11DB) # 设置衰减(支持 0-3.3V 输入)
# OLED 显示屏配置
I2C_SCL = 5 # I2C 时钟线(GPIO5)
I2C_SDA = 4 # I2C 数据线(GPIO4)
OLED_WIDTH = 128 # OLED 宽度
OLED_HEIGHT = 64 # OLED 高度
# WiFi & MQTT 配置
wifiName = "" # WiFi 名称(需修改)
wifiPassword = "Wokwi-GUEST" # WiFi 密码
clientID = "bfa6f5716db588e60333113734b2de8d" # 巴法云 Client ID
tempTopic = "TEM" # 温度主题
humiTopic = "DEH" # 湿度主题
smokeTopic = "GAS" # 烟雾浓度主题(新增,需在巴法云创建)
serverIP = "bemfa.com" # MQTT 服务器地址
port = 9501 # MQTT 端口
# ------------------ 函数定义 ------------------ #
def do_connect():
"""WiFi 连接函数"""
import network
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
print('Connecting to network...')
sta_if.active(True)
sta_if.connect(wifiName, wifiPassword)
max_retries = 10
retry_count = 0
while not sta_if.isconnected() and retry_count < max_retries:
time.sleep(1)
retry_count += 1
if not sta_if.isconnected():
print("Failed to connect to WiFi after multiple retries.")
raise OSError("WiFi connection failed")
print('WiFi connected:', sta_if.ifconfig())
def connect_and_subscribe():
"""MQTT 连接函数"""
max_retries = 5
retry_count = 0
while retry_count < max_retries:
try:
client = MQTTClient(clientID, serverIP, port)
client.connect()
print("Connected to MQTT Broker:", serverIP)
return client
except OSError as e:
print(f"MQTT connection error (attempt {retry_count + 1}): {e}")
retry_count += 1
time.sleep(2)
print("Failed to connect to MQTT broker after multiple retries.")
raise OSError("MQTT connection failed")
def init_display():
"""初始化 OLED 显示屏"""
i2c = machine.I2C(scl=machine.Pin(I2C_SCL), sda=machine.Pin(I2C_SDA))
display = ssd1306.SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c)
return display
def read_dht_data():
"""读取 DHT22 温湿度数据"""
sensor = dht.DHT22(machine.Pin(DHT_PIN))
try:
sensor.measure()
temperature = sensor.temperature()
humidity = sensor.humidity()
return temperature, humidity
except OSError as e:
print(f"DHT22 Error: {e}")
return None, None
def read_mq2_data():
"""读取 MQ2 烟雾浓度(ADC 值)"""
try:
adc_value = adc.read() # 读取 ADC 原始值(0-4095)
# 可选:将 ADC 值转换为电压或浓度(需校准)
# voltage = adc_value / 4095 * 3.3 # 转换为电压(假设 3.3V 参考)
return adc_value
except OSError as e:
print(f"MQ2 Error: {e}")
return None
def display_all_data(temperature, humidity, smoke, display):
"""显示温湿度、烟雾浓度到 OLED 和终端"""
# 清屏
display.fill(0)
# 显示标题
display.text("Env Monitor", 0, 0)
# 显示 DHT22 数据
if temperature is not None and humidity is not None:
display.text(f"Temp: {temperature}°C", 0, 20)
display.text(f"Humi: {humidity}%", 0, 32)
print(f"温度: {temperature}°C | 湿度: {humidity}%")
else:
display.text("DHT22 Error", 0, 20)
print("DHT22 数据读取失败")
# 显示 MQ2 数据
if smoke is not None:
display.text(f"Smoke: {smoke}", 0, 44)
print(f"烟雾浓度: {smoke} (ADC 值)")
else:
display.text("MQ2 Error", 0, 44)
print("MQ2 数据读取失败")
# 更新 OLED 显示
display.show()
def publish_all_data(client, temperature, humidity, smoke):
"""发布温湿度、烟雾浓度到 MQTT"""
try:
if temperature is not None:
client.publish(tempTopic, str(temperature))
if humidity is not None:
client.publish(humiTopic, str(humidity))
if smoke is not None:
client.publish(smokeTopic, str(smoke))
print("Data published to MQTT Broker!")
except Exception as e:
print(f"MQTT Publish Error: {e}")
raise
def main():
"""主函数"""
print("环境监测系统启动...")
# 连接 WiFi
try:
do_connect()
except OSError as e:
print(f"WiFi 连接错误: {e}")
time.sleep(10)
machine.reset()
# 连接 MQTT
try:
client = connect_and_subscribe()
except OSError as e:
print(f"MQTT 连接错误: {e}")
time.sleep(10)
machine.reset()
# 初始化 OLED
display = init_display()
display.text("Starting...", 0, 0)
display.show()
time.sleep(2)
try:
while True:
# 读取传感器数据
temp, humi = read_dht_data()
smoke = read_mq2_data()
# 显示数据
display_all_data(temp, humi, smoke, display)
# 发布数据到 MQTT
try:
publish_all_data(client, temp, humi, smoke)
except Exception as e:
print(f"MQTT 发布失败,尝试重连: {e}")
client = connect_and_subscribe()
publish_all_data(client, temp, humi, smoke)
# 采样间隔(DHT22 需 ≥2 秒,MQ2 可按需调整)
time.sleep(2)
except KeyboardInterrupt:
print("\n程序已停止")
display.fill(0)
display.text("Stopped", 0, 0)
display.show()
except Exception as e:
print(f"发生错误: {e}")
display.fill(0)
display.text("Error", 0, 0)
display.show()
finally:
# 断开 MQTT 连接
try:
client.disconnect()
print("Disconnected from MQTT Broker")
except Exception:
pass
# 程序入口
if __name__ == "__main__":
main()