# main.py - 环境监测系统主程序(修正管脚版)
from machine import Pin, I2C, ADC
import time
from ssd1306 import SSD1306_I2C
from mqx import MQX
from dht22 import DHT22
# ======================================== 硬件配置(修正后管脚)=========================================
# OLED屏幕(I2C接口)
i2c_oled = I2C(0, scl=Pin(1), sda=Pin(0)) # 按实际接线修改引脚
oled = SSD1306_I2C(128, 64, i2c_oled, addr=0x3C, external_vcc=False) # 修正参数顺序
# MQ-2气体传感器 - 修正为原MQ系列管脚
mq2_ao_pin = ADC(Pin(26)) # 修正为ADC0(GPIO26)
mq2_ao_pin.atten(ADC.ATTN_11DB) # 支持0-3.3V输入
mq2_do_pin = Pin(5, Pin.IN) # 修正为GPIO5
# 火焰传感器 - 新增管脚(原配置未涉及,使用GPIO3)
flame_do_pin = Pin(3, Pin.IN) # 保持原定义(未冲突)
# DHT22温湿度传感器 - 修正为原DHT管脚
dht_pin = Pin(4, Pin.IN, Pin.PULL_UP) # 修正为GPIO4
dht_sensor = DHT22(dht_pin, state_machine_id=0) # 使用状态机0
# 报警装置(LED或蜂鸣器)- 修正为原报警灯管脚
alarm_pin = Pin(15, Pin.OUT, value=0) # 修正为GPIO15,低电平不报警
# ======================================== 配置参数 =========================================
# 报警阈值(根据实际环境校准)
MQ2_AO_THRESHOLD = 2.0 # MQ-2模拟量报警阈值(V)
ALARM_COOLDOWN = 2000 # 报警冷却时间(ms),避免频繁切换
REFRESH_INTERVAL = 1000 # 数据刷新间隔(ms)
# ======================================== 全局变量 =========================================
last_alarm_time = 0 # 上次报警时间戳
# ======================================== MQ-2回调函数 =========================================
def mq2_irq_callback(voltage):
"""MQ-2传感器中断回调(用于紧急情况处理)"""
global last_alarm_time
current_time = time.ticks_ms()
if voltage > MQ2_AO_THRESHOLD and time.ticks_diff(current_time, last_alarm_time) > ALARM_COOLDOWN:
print(f"紧急报警:气体浓度超标!电压={voltage:.2f}V")
last_alarm_time = current_time
# 初始化MQ-2传感器
mq2_sensor = MQX(
adc=mq2_ao_pin,
comp_pin=mq2_do_pin,
user_cb=mq2_irq_callback,
rl_ohm=10000, # 10kΩ负载电阻
vref=3.3,
irq_trigger=Pin.IRQ_FALLING # DO引脚低电平触发中断(气体超标)
)
mq2_sensor.select_builtin("MQ2") # 使用内置MQ-2多项式
# ======================================== 工具函数 =========================================
def clear_oled():
"""清空OLED屏幕"""
oled.fill(0)
oled.show()
def display_data(gas_ppm, temp, humi, alarm_state):
"""在OLED上显示监测数据"""
clear_oled()
# 气体浓度
oled.text(f"Gas: {gas_ppm:.1f} ppm", 0, 0)
# 温湿度
oled.text(f"Temp: {temp:.1f}C" if temp else "Temp: --", 0, 16)
oled.text(f"Humi: {humi:.1f}%" if humi else "Humi: --", 0, 32)
# 报警状态
oled.text("ALARM!" if alarm_state else "Normal", 0, 48)
oled.show()
# ======================================== 主循环 =========================================
def main():
global last_alarm_time
print("系统启动中...")
oled.text("Initializing...", 0, 24)
oled.show()
time.sleep(2)
clear_oled()
try:
while True:
current_time = time.ticks_ms()
# 读取传感器数据
# 1. MQ-2气体传感器(3次采样平均)
gas_ppm = mq2_sensor.read_ppm(samples=3, delay_ms=100)
mq2_voltage = mq2_sensor.last_voltage # 获取最新电压
mq2_do_state = mq2_do_pin.value() # DO引脚状态(0=超标,1=正常)
# 2. 火焰传感器
flame_state = flame_do_pin.value() # 1=检测到火焰,0=正常
# 3. DHT22温湿度(避免频繁读取)
temp, humi = dht_sensor.get_temperature_and_humidity()
# 判断报警条件
alarm_condition = (
(mq2_do_state == 0) or # MQ-2数字输出报警
(mq2_voltage > MQ2_AO_THRESHOLD) or # MQ-2模拟量超标
(flame_state == 1) # 检测到火焰
)
# 报警逻辑(带冷却)
if alarm_condition:
last_alarm_time = current_time
alarm_pin.value(1) # 触发报警
else:
if time.ticks_diff(current_time, last_alarm_time) > ALARM_COOLDOWN:
alarm_pin.value(0) # 解除报警
# 显示数据(处理异常值)
gas_ppm = gas_ppm if not float('nan') else 0.0
display_data(gas_ppm, temp, humi, alarm_condition)
# 串口打印调试信息
print(f"[ {time.localtime()[3]:02d}:{time.localtime()[4]:02d}:{time.localtime()[5]:02d} ]")
print(f"气体: {gas_ppm:.1f}ppm (电压: {mq2_voltage:.2f}V, DO: {mq2_do_state})")
print(f"温湿度: {temp:.1f}C / {humi:.1f}%" if temp and humi else "温湿度: 读取失败")
print(f"火焰: {'检测到' if flame_state else '正常'}, 报警: {'触发' if alarm_condition else '未触发'}\n")
time.sleep_ms(REFRESH_INTERVAL)
except KeyboardInterrupt:
# 手动停止程序
clear_oled()
oled.text("System Stopped", 0, 24)
oled.show()
mq2_sensor.deinit() # 释放MQ-2资源
alarm_pin.value(0) # 关闭报警
print("程序已手动停止")
if __name__ == "__main__":
main()