import machine
from umqtt.simple import MQTTClient
import time
from machine import Timer
import dht
from machine import SoftI2C, I2C, Pin, ADC
import ssd1306
import network
# 配置参数
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
CLIENT_ID = "5b7a4f1f36924d5ab799d0ca8a943999" # 巴法云私钥
# 各传感器对应的巴法云主题
MQTT_TOPIC_TEMP = f"temp" # 温度主题
MQTT_TOPIC_HUMI = f"humi" # 湿度主题
MQTT_TOPIC_GAS = f"gas" # 气体(MQ2)主题
MQTT_TOPIC_LIGHT = f"light" # 光照(LDR)主题
MQTT_TOPIC_DIST = f"dist" # 距离(HC - SR04)主题
MQTT_TOPIC_PIR = f"pir" # 人体红外(PIR)主题
# 服务器配置
MQTT_SERVER = "bemfa.com"
MQTT_PORT = 9501
# 硬件配置
DHT_PIN = machine.Pin(14)
OLED_SCL = machine.Pin(22)
OLED_SDA = machine.Pin(21)
UPDATE_INTERVAL = 5000
PING_INTERVAL = 30000
# HC - SR04传感器配置
TRIG_PIN = machine.Pin(12, machine.Pin.OUT)
ECHO_PIN = machine.Pin(13, machine.Pin.IN)
# 蜂鸣器和LED配置
BUZZER_PIN = machine.Pin(25, machine.Pin.OUT)
LED_PIN = machine.Pin(26, machine.Pin.OUT)
# LDR光敏传感器配置
LIGHT_AO_PIN = 34
adc_light = machine.ADC(machine.Pin(LIGHT_AO_PIN))
adc_light.atten(machine.ADC.ATTN_11DB)
# MQ2传感器配置
MQ2_AO_PIN = 35
adc_mq2 = machine.ADC(machine.Pin(MQ2_AO_PIN))
adc_mq2.atten(machine.ADC.ATTN_11DB)
# PIR人体红外传感器配置
PIR_PIN = machine.Pin(15, machine.Pin.IN)
# 初始化设备
dht_sensor = dht.DHT22(DHT_PIN)
i2c = SoftI2C(scl=OLED_SCL, sda=OLED_SDA)
oled = ssd1306.SSD1306_I2C(128, 64, i2c)
# 全局变量
mqtt_client = None
ping_timer = None
wifi_connect_time = 0
mqtt_connect_time = 0
# WiFi连接
def connect_wifi():
global wifi_connect_time
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
print("连接WiFi...")
sta_if.active(True)
sta_if.connect(WIFI_SSID, WIFI_PASSWORD)
max_wait = 15
while max_wait > 0 and not sta_if.isconnected():
time.sleep(1)
max_wait -= 1
print(f"等待WiFi连接...剩余尝试次数: {max_wait}")
if not sta_if.isconnected():
print("WiFi连接失败,重启设备...")
machine.reset()
else:
wifi_connect_time = time.ticks_ms()
print(f"网络配置: {sta_if.ifconfig()}")
print(f"WiFi连接成功,耗时: {time.ticks_diff(time.ticks_ms(), wifi_connect_time)}ms")
else:
print("WiFi已连接")
# MQTT消息回调
def message_callback(topic, msg):
print(f"收到消息: {topic.decode()} - {msg.decode()}")
# 初始化MQTT,订阅所有相关主题
def init_mqtt():
global mqtt_client, mqtt_connect_time
try:
print(f"尝试连接MQTT服务器: {MQTT_SERVER}:{MQTT_PORT}")
start_time = time.ticks_ms()
mqtt_client = MQTTClient(CLIENT_ID, MQTT_SERVER, MQTT_PORT, keepalive=60)
mqtt_client.set_callback(message_callback)
mqtt_client.connect()
# 订阅各个传感器数据对应的主题
mqtt_client.subscribe(MQTT_TOPIC_TEMP)
mqtt_client.subscribe(MQTT_TOPIC_HUMI)
mqtt_client.subscribe(MQTT_TOPIC_GAS)
mqtt_client.subscribe(MQTT_TOPIC_LIGHT)
mqtt_client.subscribe(MQTT_TOPIC_DIST)
mqtt_client.subscribe(MQTT_TOPIC_PIR)
mqtt_connect_time = time.ticks_ms()
print(f"已连接到MQTT服务器,耗时: {time.ticks_diff(mqtt_connect_time, start_time)}ms")
return True
except Exception as e:
print(f"MQTT连接失败: {e}")
return False
# 读取DHT22数据,获取温度和湿度
def read_dht_data():
try:
dht_sensor.measure()
return dht_sensor.temperature(), dht_sensor.humidity()
except OSError as e:
print(f"DHT22读取失败: {e}")
return None, None
# 读取MQ2数据(5次采样平均)
def read_mq2_data():
try:
samples = []
for _ in range(5):
samples.append(adc_mq2.read())
time.sleep_ms(50)
return sum(samples) // len(samples)
except OSError as e:
print(f"MQ2读取失败: {e}")
return None
# 读取光敏数据(5次采样平均)
def read_light_data():
try:
samples = []
for _ in range(5):
samples.append(adc_light.read())
time.sleep_ms(50)
return sum(samples) // len(samples)
except OSError as e:
print(f"光敏传感器读取失败: {e}")
return None
# 读取HC - SR04数据,计算距离
def read_hc_sr04_data():
try:
TRIG_PIN.value(0)
time.sleep_us(2)
TRIG_PIN.value(1)
time.sleep_us(10)
TRIG_PIN.value(0)
while ECHO_PIN.value() == 0:
signal_off = time.ticks_us()
while ECHO_PIN.value() == 1:
signal_on = time.ticks_us()
duration = signal_on - signal_off
return (duration * 0.0343) / 2 # 距离 (cm)
except OSError as e:
print(f"HC - SR04读取失败: {e}")
return None
# 读取PIR数据
def read_pir_data():
try:
return PIR_PIN.value() # 0=未检测,1=检测到
except OSError as e:
print(f"PIR读取失败: {e}")
return None
# 触发警报(蜂鸣器和LED)
def trigger_alert():
try:
BUZZER_PIN.value(1)
LED_PIN.value(1)
time.sleep_ms(500)
BUZZER_PIN.value(0)
LED_PIN.value(0)
except Exception as e:
print(f"警报触发失败: {e}")
# 更新OLED显示(多传感器数据)
def update_oled_display(temp, humi, mq2, light, distance, pir_state):
oled.fill(0)
oled.text("Env System", 0, 0)
oled.text(f"Temp: {temp:.1f}°C", 0, 10)
oled.text(f"Humi: {humi:.1f}%", 0, 20)
oled.text(f"Gas: {mq2}", 0, 30)
oled.text(f"Light: {light}", 0, 40)
oled.text(f"Dist: {distance:.1f}cm", 0, 50)
oled.text(f"PIR: {'检测到' if pir_state else '未检测'}", 0, 60 if 60 < 64 else 55)
oled.show()
# 发布数据到MQTT,分别上传到对应主题
def publish_to_mqtt(temp, humi, mq2, light, distance, pir_state):
global mqtt_client
publish_result = True
# 发布温度数据
if temp is not None:
try:
mqtt_client.publish(MQTT_TOPIC_TEMP, f"{temp:.1f}".encode())
print(f"已发布温度数据到 {MQTT_TOPIC_TEMP}")
except Exception as e:
print(f"温度数据发布失败: {e}")
publish_result = False
# 发布湿度数据
if humi is not None:
try:
mqtt_client.publish(MQTT_TOPIC_HUMI, f"{humi:.1f}".encode())
print(f"已发布湿度数据到 {MQTT_TOPIC_HUMI}")
except Exception as e:
print(f"湿度数据发布失败: {e}")
publish_result = False
# 发布气体数据
if mq2 is not None:
try:
mqtt_client.publish(MQTT_TOPIC_GAS, f"{mq2}".encode())
print(f"已发布气体数据到 {MQTT_TOPIC_GAS}")
except Exception as e:
print(f"气体数据发布失败: {e}")
publish_result = False
# 发布光照数据
if light is not None:
try:
mqtt_client.publish(MQTT_TOPIC_LIGHT, f"{light}".encode())
print(f"已发布光照数据到 {MQTT_TOPIC_LIGHT}")
except Exception as e:
print(f"光照数据发布失败: {e}")
publish_result = False
# 发布距离数据
if distance is not None:
try:
mqtt_client.publish(MQTT_TOPIC_DIST, f"{distance:.1f}".encode())
print(f"已发布距离数据到 {MQTT_TOPIC_DIST}")
except Exception as e:
print(f"距离数据发布失败: {e}")
publish_result = False
# 发布PIR数据
if pir_state is not None:
try:
mqtt_client.publish(MQTT_TOPIC_PIR, f"{pir_state}".encode())
print(f"已发布PIR数据到 {MQTT_TOPIC_PIR}")
except Exception as e:
print(f"PIR数据发布失败: {e}")
publish_result = False
return publish_result
# MQTT心跳
def send_mqtt_ping(timer):
global mqtt_client
if mqtt_client:
try:
print("发送MQTT心跳...")
mqtt_client.ping()
print("MQTT心跳发送成功")
except Exception as e:
print(f"MQTT心跳失败: {e}")
reconnect_mqtt()
else:
print("MQTT客户端未初始化,无法发送心跳")
reconnect_mqtt()
# 重新连接MQTT
def reconnect_mqtt():
global mqtt_client, ping_timer
print("尝试重新连接MQTT...")
try:
if mqtt_client:
print("关闭现有MQTT连接...")
mqtt_client.disconnect()
if ping_timer:
print("停止现有MQTT心跳定时器...")
ping_timer.deinit()
except Exception as e:
print(f"关闭MQTT连接时出错: {e}")
# 重置MQTT客户端
mqtt_client = None
# 先检查WiFi连接
connect_wifi()
# 尝试重新连接MQTT
if init_mqtt():
ping_timer = Timer(-1)
ping_timer.init(period=PING_INTERVAL, mode=Timer.PERIODIC, callback=send_mqtt_ping)
print("MQTT重新连接成功")
return True
else:
print("MQTT重新连接失败,5秒后重试...")
time.sleep(5)
return False
# 主函数
def main():
print("===== 环境监测系统启动 =====")
connect_wifi()
if not init_mqtt():
print("MQTT初始化失败,程序退出")
return
global ping_timer
ping_timer = Timer(-1)
ping_timer.init(period=PING_INTERVAL, mode=Timer.PERIODIC, callback=send_mqtt_ping)
print(f"系统启动完成,开始多传感器数据采集...更新间隔: {UPDATE_INTERVAL/1000}秒")
# OLED初始显示
oled.fill(0)
oled.text("System Starting...", 0, 30)
oled.show()
last_update = time.ticks_ms()
publish_failure_count = 0
try:
while True:
# 检查WiFi连接状态
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
print("WiFi连接丢失,尝试重新连接...")
connect_wifi()
if mqtt_client:
try:
# 检查是否有MQTT消息
mqtt_client.check_msg()
except Exception as e:
print(f"检查MQTT消息时出错: {e}")
reconnect_mqtt()
current_time = time.ticks_ms()
if time.ticks_diff(current_time, last_update) >= UPDATE_INTERVAL:
last_update = current_time
print("开始读取传感器数据...")
# 读取各传感器数据
temp, humi = read_dht_data()
mq2 = read_mq2_data()
light = read_light_data()
distance = read_hc_sr04_data()
pir_state = read_pir_data()
# 检查所有传感器数据是否有效
if all([v is not None for v in [temp, humi, mq2, light, distance, pir_state]]):
print(f"传感器数据读取成功 - 温度: {temp:.1f}°C, 湿度: {humi:.1f}%, 气体: {mq2}, 光照: {light}, 距离: {distance:.1f}cm, PIR: {'检测到' if pir_state else '未检测'}")
# 更新OLED显示
update_oled_display(temp, humi, mq2, light, distance, pir_state)
# 发布数据到MQTT
publish_success = publish_to_mqtt(temp, humi, mq2, light, distance, pir_state)
# 异常触发(距离<20cm、气体>3000或检测到PIR)
if (distance is not None and distance < 20) or \
(mq2 is not None and mq2 > 3000) or \
(pir_state is not None and pir_state):
trigger_alert()
if publish_success:
publish_failure_count = 0
else:
publish_failure_count += 1
print(f"MQTT发布失败,累计失败次数: {publish_failure_count}")
# 如果连续3次发布失败,尝试重新连接
if publish_failure_count >= 3:
print("连续3次MQTT发布失败,尝试重新连接...")
reconnect_mqtt()
publish_failure_count = 0
else:
print("传感器数据读取失败,部分数据无效")
else:
# 短暂休眠,减少CPU负载
time.sleep_ms(100)
except Exception as e:
print(f"主程序异常: {e}")
print("尝试重新连接MQTT...")
reconnect_mqtt()
# 等待一段时间后重试
time.sleep(5)
main()
if __name__ == "__main__":
main()