import dht
import machine
import time
import ssd1306
# 全局配置
DHT_PIN = 14 # DHT22数据引脚
I2C_SCL = 5 # OLED I2C 时钟线
I2C_SDA = 4 # OLED I2C 数据线
OLED_WIDTH = 128 # OLED 宽度
OLED_HEIGHT = 64 # OLED 高度
MQ2_ADC_PIN = 0 # MQ2 模拟输出接 ADC0
MQ2_DIGITAL_PIN = 18# MQ2 数字输出接 GPIO18
# MQ2 简化配置(直接用原始 ADC 值,复杂校准可后续扩展)
SMOKE_THRESHOLD = 2000 # 烟雾触发阈值(根据实际校准调整)
def init_display():
"""初始化 OLED 显示屏"""
i2c = machine.I2C(scl=machine.Pin(I2C_SCL), sda=machine.Pin(I2C_SDA))
return ssd1306.SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c)
def init_mq2():
"""初始化 MQ2 传感器(ADC + 数字引脚)"""
adc = machine.ADC(MQ2_ADC_PIN)
digital_pin = machine.Pin(MQ2_DIGITAL_PIN, machine.Pin.IN)
return adc, digital_pin
def read_dht22():
"""读取 DHT22 温湿度(增加重试逻辑)"""
dht_sensor = dht.DHT22(machine.Pin(DHT_PIN))
for _ in range(3):
try:
dht_sensor.measure()
return dht_sensor.temperature(), dht_sensor.humidity()
except OSError:
time.sleep(1)
return None, None
def read_mq2(adc, digital_pin):
"""读取 MQ2 原始数据(简化版,对齐 Wokwi 逻辑)"""
adc_value = adc.read() # 直接读 ADC 原始值
digital_state = digital_pin.value()
is_smoke = digital_state == 0 or adc_value > SMOKE_THRESHOLD
return adc_value, is_smoke
def display_data(temperature, humidity, mq2_adc, is_smoke, display):
"""显示所有数据到 OLED 和终端"""
display.fill(0)
# 显示 DHT22 数据
if temperature is not None and humidity is not None:
print(f"温度: {temperature}°C | 湿度: {humidity}%")
display.text(f"Temp: {temperature} C", 0, 0)
display.text(f"Humi: {humidity} %", 0, 12)
else:
print("DHT22 读取失败")
display.text("DHT22 Error", 0, 0)
# 显示 MQ2 数据
print(f"MQ2 原始值: {mq2_adc} | 烟雾检测: {'是' if is_smoke else '否'}")
display.text(f"MQ2: {mq2_adc}", 0, 24)
display.text(f"Smoke: {'Detected' if is_smoke else 'Normal'}", 0, 36)
display.show()
def main():
"""主程序逻辑"""
print("监控系统启动...")
display = init_display()
display.text("Starting...", 0, 0)
display.show()
adc, digital_pin = init_mq2()
time.sleep(2)
try:
while True:
# 读取传感器
temp, humi = read_dht22()
mq2_adc, is_smoke = read_mq2(adc, digital_pin)
# 显示数据
display_data(temp, humi, mq2_adc, is_smoke, display)
time.sleep(1)
except KeyboardInterrupt:
print("\n程序停止")
display.fill(0)
display.text("Stopped", 0, 0)
display.show()
except Exception as e:
print(f"错误: {e}")
display.fill(0)
display.text("Error", 0, 0)
display.show()
if __name__ == "__main__":
main()