from machine import Pin, ADC, I2C, Timer
import network
import time
from umqtt.simple import MQTTClient
from ssd1306 import SSD1306_I2C
import json
# ==================== 配置参数 ====================
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
BEMFA_SERVER = "bemfa.com"
BEMFA_PORT = 9501
BEMFA_CLIENT_ID = "d461f6553a7905f58475606696123f00"
BEMFA_TOPIC = "myMQ2"
BEMFA_ONLINE_TOPIC = f"{BEMFA_TOPIC}_online"
# MQ-2传感器配置
MQ2_PIN = 34 # 烟雾传感器AO引脚
mq2_adc = ADC(Pin(MQ2_PIN))
mq2_adc.atten(ADC.ATTN_11DB) # 11dB衰减,最大输入电压约3.6V
# OLED 配置(根据实际接线调整)
i2c = I2C(0, scl=Pin(18), sda=Pin(23), freq=400000)
oled_width = 128
oled_height = 64
oled = SSD1306_I2C(oled_width, oled_height, i2c)
# 全局变量
wifi_connected = False
mqtt_connected = False
client = None
# ==================== 辅助函数 ====================
def log(message, is_error=False):
"""打印日志并在 OLED 显示"""
lt = time.localtime()
timestamp = f"{lt[0]}-{lt[1]:02d}-{lt[2]:02d} {lt[3]:02d}:{lt[4]:02d}:{lt[5]:02d}"
prefix = "[ERROR] " if is_error else "[INFO] "
print(f"{timestamp} {prefix}{message}")
# OLED 显示(显示最后两行日志)
oled.fill(0)
oled.text("MQ-2 Monitor", 0, 0)
oled.text(message[:20], 0, 20)
if len(message) > 20:
oled.text(message[20:40], 0, 30)
oled.show()
def reconnect_wifi():
"""重连 WiFi 逻辑"""
global wifi_connected
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
log("WiFi connecting...")
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
for i in range(20):
if wlan.isconnected():
break
time.sleep(1)
if wlan.isconnected():
log(f"WiFi connected: {wlan.ifconfig()[0]}")
wifi_connected = True
return True
else:
log("WiFi connect failed", is_error=True)
wifi_connected = False
return False
else:
wifi_connected = True
return True
def connect_mqtt():
"""连接 MQTT 服务器"""
global client, mqtt_connected
try:
def sub_callback(topic, msg):
topic_str = topic.decode()
msg_str = msg.decode()
log(f"Received [{topic_str}]: {msg_str}")
client = MQTTClient(
client_id=BEMFA_CLIENT_ID,
server=BEMFA_SERVER,
port=BEMFA_PORT,
keepalive=60
)
client.set_callback(sub_callback)
client.connect()
log("MQTT connected")
client.subscribe(BEMFA_TOPIC)
log(f"Subscribed: {BEMFA_TOPIC}")
client.publish(BEMFA_ONLINE_TOPIC, "1", retain=True)
log(f"Published online status")
mqtt_connected = True
return True
except Exception as e:
log(f"MQTT connect failed: {e}", is_error=True)
mqtt_connected = False
return False
def read_mq2():
"""读取 MQ-2 烟雾浓度"""
try:
# 读取ADC值并转换为ppm(简化处理,实际需要校准)
adc_value = mq2_adc.read()
# 模拟转换公式(根据实际传感器特性调整)
ppm = int(adc_value / 4095 * 1000) # 转换为0-1000ppm范围
log(f"MQ2 reading: {adc_value} (ADC), {ppm} ppm")
return ppm
except Exception as e:
log(f"MQ2 read failed: {e}", is_error=True)
return None
def publish_data():
"""发布数据到巴法云 MQTT"""
global mqtt_connected
if not wifi_connected or not mqtt_connected:
log("WiFi/MQTT not connected")
return False
try:
ppm = read_mq2()
if ppm is None:
return False
# 构造JSON数据
data = {
"smoke_level": ppm,
"status": get_smoke_status(ppm),
"timestamp": time.time()
}
json_data = json.dumps(data)
# 发布数据
client.publish(BEMFA_TOPIC, json_data)
log(f"Published: {json_data}")
# 更新在线状态
client.publish(BEMFA_ONLINE_TOPIC, "1", retain=True)
return True
except Exception as e:
log(f"Publish failed: {e}", is_error=True)
mqtt_connected = False
try:
client.disconnect()
except:
pass
return False
def get_smoke_status(ppm):
"""获取烟雾状态描述"""
if ppm < 200:
return "Clean"
elif 200 <= ppm < 500:
return "Low"
elif 500 <= ppm < 800:
return "Medium"
else:
return "Danger"
def check_connections(timer):
"""定时器回调:检查连接状态并维持心跳"""
global wifi_connected, mqtt_connected, client
wlan = network.WLAN(network.STA_IF)
# 检查 WiFi 连接
if not wlan.isconnected():
wifi_connected = False
log("WiFi disconnected")
reconnect_wifi()
# 检查 MQTT 连接
if wifi_connected and not mqtt_connected:
log("MQTT disconnected")
connect_mqtt()
# 维持 MQTT 心跳
if mqtt_connected and client:
try:
client.ping()
client.check_msg()
except Exception as e:
log(f"MQTT heartbeat failed: {e}", is_error=True)
mqtt_connected = False
# ==================== 主函数 ====================
def main():
log("System starting...")
# 连接 WiFi
if not reconnect_wifi():
log("Initial WiFi failed", is_error=True)
return
# 连接 MQTT
if not connect_mqtt():
log("Initial MQTT failed", is_error=True)
return
# 启动定时器检查连接
timer = Timer(-1)
timer.init(period=30000, mode=Timer.PERIODIC, callback=check_connections)
try:
# 主循环发布数据
while True:
publish_data()
time.sleep(10)
except KeyboardInterrupt:
log("Program interrupted")
except Exception as e:
log(f"Program error: {e}", is_error=True)
finally:
# 清理资源
if client and mqtt_connected:
client.publish(BEMFA_ONLINE_TOPIC, "0", retain=True)
log("Published offline status")
client.disconnect()
log("MQTT disconnected")
if 'timer' in locals():
timer.deinit()
log("Timer stopped")
log("System stopped")
if __name__ == "__main__":
main()