from machine import Pin, ADC, SoftI2C
import time
from ssd1306 import SSD1306_I2C
from umqtt.simple import MQTTClient
import network
import ujson
# 网络配置
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# 巴法云配置
BAFA_SERVER = "bemfa.com"
BAFA_PORT = 9501
CLIENT_ID = "ee7ca1783cfe4cd4bcf27ea0700fe3f9" # 替换为巴法云私钥
TOPIC = "MQ2" # MQTT主题
PASSWORD = "ee7ca1783cfe4cd4bcf27ea0700fe3f9" # 替换为巴法云API密钥
# MQ2传感器配置
MQ2_PIN = 34 # ADC引脚GP34
VOLTAGE_REF = 3.3 # 参考电压3.3V
MQ2_RL = 10 # 负载电阻10kΩ
# 气体浓度阈值
GAS_ALARM_LOW = 200 # 低浓度报警
GAS_ALARM_HIGH = 1000 # 高浓度报警
# OLED屏幕配置
OLED_WIDTH = 128
OLED_HEIGHT = 64
i2c = SoftI2C(sda=Pin(23), scl=Pin(18)) # Wokwi中默认I2C引脚
oled = SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c)
# 全局变量
mqtt_client = None
retry_count = 0
sta_if = None
# 初始化函数
def setup():
global gas_adc
gas_adc = ADC(Pin(MQ2_PIN))
gas_adc.atten(ADC.ATTN_11DB) # 11dB衰减
# 初始化OLED
oled.fill(0)
oled.text("MQ2 Gas Monitor", 0, 0)
oled.text("Connecting WiFi...", 0, 20)
oled.show()
# 连接WiFi
global sta_if
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
sta_if.active(True)
sta_if.connect(WIFI_SSID, WIFI_PASSWORD)
while not sta_if.isconnected():
pass
print('WiFi已连接,IP地址:', sta_if.ifconfig()[0])
# 初始化MQTT
init_mqtt()
# 控制台标题
print("=" * 50)
print(" Wokwi MQ2气体传感器监测系统")
print("=" * 50)
print("ADC值\t电压(mV)\t浓度(ppm)\t状态\t\tMQTT状态")
print("-" * 50)
# 初始化MQTT连接
def init_mqtt():
global mqtt_client
try:
mqtt_client = MQTTClient(CLIENT_ID, BAFA_SERVER, BAFA_PORT, "", PASSWORD)
mqtt_client.set_callback(mqtt_callback)
mqtt_client.connect()
mqtt_client.subscribe(TOPIC)
print("已连接到巴法云MQTT服务器")
update_oled_status("MQTT: 已连接")
except Exception as e:
print(f"MQTT连接失败: {e}")
update_oled_status("MQTT: 连接失败")
mqtt_client = None
# MQTT回调函数
def mqtt_callback(topic, msg):
print(f"收到来自主题 {topic.decode()} 的消息: {msg.decode()}")
# 计算电压值(mV)
def get_voltage(adc_value):
voltage = (adc_value / 4095) * VOLTAGE_REF
return round(voltage * 1000, 2)
# 计算气体浓度(ppm)
def get_gas_concentration(adc_value):
if adc_value > 3500:
return 0
elif adc_value < 1000:
return 2000
else:
return round((3500 - adc_value) * 2000 / 2500, 2)
# 判断气体状态
def get_gas_status(concentration):
if concentration == 0:
return "清洁空气"
elif concentration < GAS_ALARM_LOW:
return "低浓度"
elif concentration < GAS_ALARM_HIGH:
return "中浓度"
else:
return "高浓度! 报警!"
# 更新OLED显示
def update_oled(adc_value, voltage, concentration, status, mqtt_status):
oled.fill(0) # 清屏
# 显示标题
oled.text("MQ2 Gas Sensor", 0, 0)
# 显示ADC值和电压
oled.text(f"ADC: {adc_value}", 0, 16)
oled.text(f"Volt: {voltage}mV", 0, 32)
# 显示浓度和状态
oled.text(f"Gas: {concentration}ppm", 0, 48)
oled.text(status, 0, 60)
# 显示MQTT状态
oled.text(f"MQTT: {mqtt_status}", 0, 52)
# 报警指示
if "报警" in status:
oled.fill_rect(0, 56, OLED_WIDTH, 8, 1) # 红色警告条
oled.show() # 更新显示
# 更新OLED上的MQTT状态
def update_oled_status(status):
oled.fill_rect(0, 52, OLED_WIDTH, 12, 0) # 清除旧状态
oled.text(f"MQTT: {status}", 0, 52)
oled.show()
# 重连MQTT
def reconnect_mqtt():
global retry_count, mqtt_client
retry_count += 1
wait_time = min(30, 2 ** retry_count) # 指数退避
print(f"MQTT重连尝试 {retry_count},{wait_time}秒后重试...")
time.sleep(wait_time)
init_mqtt()
# 发布数据到MQTT
def publish_to_mqtt(adc_value, voltage, concentration, status):
global mqtt_client
if mqtt_client:
try:
data = {
"adc": adc_value,
"voltage": voltage,
"concentration": concentration,
"status": status,
"timestamp": time.time()
}
mqtt_client.publish(TOPIC, ujson.dumps(data))
mqtt_client.check_msg()
except Exception as e:
print(f"MQTT发布失败: {e}")
mqtt_client = None
reconnect_mqtt()
# 主循环
def loop():
try:
while True:
adc_value = gas_adc.read()
voltage = get_voltage(adc_value)
concentration = get_gas_concentration(adc_value)
status = get_gas_status(concentration)
# MQTT状态
mqtt_status = "已连接" if mqtt_client else "断开中"
# 控制台输出
print(f"{adc_value}\t{voltage}\t{concentration}\t{status}\t{mqtt_status}")
if concentration >= GAS_ALARM_HIGH:
print("!" * 50)
# OLED显示
update_oled(adc_value, voltage, concentration, status, mqtt_status)
# 发布到MQTT
publish_to_mqtt(adc_value, voltage, concentration, status)
time.sleep(0.5) # 500ms更新一次
except KeyboardInterrupt:
print("\n" + "-" * 50)
print("程序已停止")
oled.fill(0)
oled.text("System Stopped", 0, 32)
oled.show()
if mqtt_client:
mqtt_client.disconnect()
# 程序入口
if __name__ == '__main__':
setup()
loop()