from machine import Pin, ADC, I2C
import ssd1306
import time
from umqtt.simple import MQTTClient
import ubinascii
import machine
# 网络配置(需替换为实际WiFi信息)
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# 巴法云MQTT配置
MQTT_BROKER = "bemfa.com"
MQTT_PORT = 9501
MQTT_CLIENT_ID = "7af89f740f95480fbd7c1509ae3c5d3e"
MQTT_LIGHT_TOPIC = "MQ2"
# MQ2传感器配置
MQ2_PIN = 34 # ADC复用管脚为GP34
RL_VALUE = 5 # 负载电阻值(kΩ)
RO_CLEAN_AIR_FACTOR = 9.83 # MQ2在洁净空气中的电阻比值
# OLED显示屏配置
OLED_WIDTH = 128
OLED_HEIGHT = 64
OLED_SCL = 18
OLED_SDA = 23
# 继电器和LED配置
RELAY_PIN = 15 # 继电器控制引脚
ALARM_LED_PIN = 15 # 继电器控制的LED引脚
SMOKE_THRESHOLD = 50 # 烟雾浓度低于此值时触发报警(ppm)
# 气体类型定义及对应参数
GAS_LPG = 0
GAS_CO = 1
GAS_SMOKE = 2
# 不同气体的曲线参数(a:斜率, b:截距)
GAS_PARAMS = {
GAS_LPG: {'a': -0.45, 'b': 2.95},
GAS_CO: {'a': -0.35, 'b': 2.30},
GAS_SMOKE: {'a': -0.42, 'b': 3.50},
}
# 全局变量
mq2_adc = None
ro = None # 传感器在洁净空气中的电阻值
oled = None
mqtt_client = None
relay = None
last_alarm_time = 0
alarm_interval = 1000 # 报警闪烁间隔(毫秒)
def setup_wifi():
"""连接WiFi网络"""
import network
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('正在连接WiFi...')
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
while not wlan.isconnected():
pass
print('网络配置成功, IP地址:', wlan.ifconfig()[0])
def setup_mqtt():
"""初始化MQTT客户端并连接到巴法云平台"""
global mqtt_client
print("正在连接到MQTT服务器...")
mqtt_client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, MQTT_PORT)
try:
mqtt_client.connect()
print("MQTT连接成功")
return True
except Exception as e:
print(f"MQTT连接失败: {e}")
return False
def setup_relay():
"""初始化继电器控制引脚"""
global relay
relay = Pin(RELAY_PIN, Pin.OUT)
relay.value(0) # 初始状态关闭继电器
def setup():
"""初始化硬件组件"""
global mq2_adc, oled, ro
# 初始化MQ2传感器
mq2_adc = ADC(Pin(MQ2_PIN))
mq2_adc.atten(ADC.ATTN_11DB) # 11dB 衰减, 最大输入电压约3.6v
# 初始化OLED显示屏
i2c = I2C(0, scl=Pin(OLED_SCL), sda=Pin(OLED_SDA), freq=400000)
oled = ssd1306.SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c)
# 初始化继电器
setup_relay()
# 显示初始化信息
oled.fill(0)
oled.text("MQ2 Sensor Init...", 0, 30)
oled.show()
# 连接WiFi
setup_wifi()
# 连接MQTT服务器
setup_mqtt()
# 传感器预热并计算Ro值
print("MQ2传感器预热中 (需要约5秒)...")
for i in range(5):
print(f"预热进度: {i+1}/5秒")
oled.fill(0)
oled.text(f"预热中: {i+1}/5秒", 20, 30)
oled.show()
time.sleep(1)
# 计算洁净空气中的Ro值(采样100次取平均)
rs = 0
for i in range(100):
rs += calculate_rs(mq2_adc.read())
time.sleep(0.01)
ro = rs / 100 / RO_CLEAN_AIR_FACTOR
print(f"Ro值计算完成: {ro:.2f} kΩ")
# 显示系统就绪信息
oled.fill(0)
oled.text("MQ2 Monitor Ready", 10, 20)
oled.text(f"Ro: {ro:.2f} kΩ", 20, 40)
oled.show()
time.sleep(2)
def calculate_rs(adc_value):
"""计算传感器电阻Rs"""
if adc_value == 0:
adc_value = 1 # 防止除零错误
return RL_VALUE * (4095.0 / adc_value - 1)
def calculate_ppm(rs_ro_ratio, gas_type):
"""根据Rs/Ro比值和气体类型计算浓度(ppm)"""
if gas_type not in GAS_PARAMS:
return None
a = GAS_PARAMS[gas_type]['a']
b = GAS_PARAMS[gas_type]['b']
# 使用公式: log(ppm) = a * log(Rs/Ro) + b
return 10 ** (a * (rs_ro_ratio) + b)
def read_mq2_sensor():
"""读取MQ2传感器数据"""
try:
# 读取ADC值并计算Rs
adc_value = mq2_adc.read()
rs = calculate_rs(adc_value)
# 计算Rs/Ro比值
rs_ro_ratio = rs / ro
# 计算各种气体浓度
lpg_ppm = calculate_ppm(rs_ro_ratio, GAS_LPG)
co_ppm = calculate_ppm(rs_ro_ratio, GAS_CO)
smoke_ppm = calculate_ppm(rs_ro_ratio, GAS_SMOKE)
return {
'adc': adc_value,
'rs': rs,
'rs_ro_ratio': rs_ro_ratio,
'lpg': lpg_ppm,
'co': co_ppm,
'smoke': smoke_ppm
}
except Exception as e:
print(f"传感器读取错误: {e}")
return None
def update_console(data):
"""更新控制台显示"""
if data is None:
print("传感器读取失败")
return
print(f"ADC值: {data['adc']}")
print(f"Rs值: {data['rs']:.2f} kΩ")
print(f"Rs/Ro比值: {data['rs_ro_ratio']:.2f}")
print(f"LPG浓度: {data['lpg']:.0f} ppm")
print(f"CO浓度: {data['co']:.0f} ppm")
print(f"烟雾浓度: {data['smoke']:.0f} ppm")
print("-" * 30)
def update_oled(data, alarm_active=False, alarm_msg=""):
"""更新OLED屏幕显示"""
oled.fill(0) # 清屏
if data is None:
oled.text("Sensor Error", 20, 30)
oled.show()
return
# 显示标题
oled.text("MQ2 Sensor Data", 0, 0)
# 显示LPG浓度
oled.text("LPG:", 10, 15)
oled.text(f"{data['lpg']:.0f} ppm", 60, 15)
# 显示CO浓度
oled.text("CO:", 10, 30)
oled.text(f"{data['co']:.0f} ppm", 60, 30)
# 显示烟雾浓度
oled.text("Smoke:", 10, 45)
oled.text(f"{data['smoke']:.0f} ppm", 60, 45)
# 显示更新时间戳
current_time = time.localtime()
time_str = f"{current_time[3]:02d}:{current_time[4]:02d}:{current_time[5]:02d}"
oled.text(time_str, 90, 56)
# 显示报警信息
if alarm_active:
oled.text("ALARM!", 0, 56)
if alarm_msg:
oled.text(alarm_msg, 0, 56)
oled.show() # 更新显示
def publish_to_bemfa(data, alarm_active=False, alarm_msg=""):
"""将传感器数据发布到巴法云平台"""
if data is None or mqtt_client is None:
return
try:
# 构建JSON格式数据
message = f"{{\"lpg\":{data['lpg']:.0f},\"co\":{data['co']:.0f},\"smoke\":{data['smoke']:.0f}"
if alarm_active:
message += f",\"alarm\":1,\"alarm_msg\":\"{alarm_msg}\""
else:
message += ",\"alarm\":0"
message += "}"
# 发布数据到MQTT主题
mqtt_client.publish(MQTT_LIGHT_TOPIC, message)
print("数据已发布到巴法云平台")
except Exception as e:
print(f"发布到巴法云失败: {e}")
# 尝试重新连接MQTT
setup_mqtt()
def control_relay_alarm(active):
"""控制继电器报警LED"""
global last_alarm_time
current_time = time.ticks_ms()
if active:
# 计算时间间隔
time_elapsed = time.ticks_diff(current_time, last_alarm_time)
# 每500ms切换一次继电器状态,实现LED闪烁
if time_elapsed >= 500:
relay.value(not relay.value()) # 切换继电器状态
last_alarm_time = current_time
else:
relay.value(0) # 关闭继电器
def check_alarm(data):
"""检查是否需要触发警报"""
# 设定报警阈值 (ppm)
lpg_threshold = 2000
co_threshold = 100
smoke_low_threshold = SMOKE_THRESHOLD # 烟雾浓度低于此值触发报警
alarm = False
alarm_msg = ""
# 检查LPG浓度超标
if data['lpg'] > lpg_threshold:
alarm = True
alarm_msg += "LPG超标! "
# 检查CO浓度超标
if data['co'] > co_threshold:
alarm = True
alarm_msg += "CO超标! "
# 检查烟雾浓度低于阈值
if data['smoke'] < smoke_low_threshold:
alarm = True
alarm_msg += f"烟雾浓度过低({data['smoke']:.0f}ppm)! "
if alarm:
print(f"警报: {alarm_msg}")
# 在OLED上显示警报信息
update_oled(data, True, alarm_msg)
# 控制继电器报警LED闪烁
control_relay_alarm(True)
# 发布警报信息到巴法云
publish_to_bemfa(data, True, alarm_msg)
return True
# 没有报警时关闭继电器
control_relay_alarm(False)
update_oled(data)
publish_to_bemfa(data)
return False
if __name__ == "__main__":
print("启动MQ2气体监测系统...")
try:
# 初始化系统
setup()
# 主循环
while True:
# 读取传感器数据
sensor_data = read_mq2_sensor()
# 更新控制台显示
update_console(sensor_data)
# 检查是否需要触发警报
check_alarm(sensor_data)
# 等待1秒
time.sleep(1)
except KeyboardInterrupt:
print("\n程序已停止")
if oled:
oled.fill(0)
oled.show() # 清屏
if mqtt_client:
mqtt_client.disconnect()
if relay:
relay.value(0) # 确保关闭继电器
except Exception as e:
print(f"发生错误: {e}")
if oled:
oled.fill(0)
oled.text("System Error", 20, 30)
oled.show()
if mqtt_client:
mqtt_client.disconnect()
if relay:
relay.value(0) # 确保关闭继电器