import dht
import machine
import time
import ssd1306
from umqtt.simple import MQTTClient
import network
import uos
import sdcard
# ========== 硬件引脚配置 ==========
DHT_PIN = 14 # DHT22数据引脚
MQ2_AOUT_PIN = 34 # MQ2模拟输出(ADC引脚)
MQ2_DOUT_PIN = 35 # MQ2数字输出
LED_PIN = 2 # LED指示灯引脚
# OLED引脚
I2C_SCL = 5 # OLED I2C时钟
I2C_SDA = 4 # OLED I2C数据
OLED_WIDTH = 128
OLED_HEIGHT = 64
# SD卡引脚 (SPI接口)
SD_CS = 15 # SD卡片选引脚
SD_SCK = 18 # SPI时钟
SD_MOSI = 23 # SPI主机输出
SD_MISO = 19 # SPI主机输入
# ========== 网络配置 ==========
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASS = ""
MQTT_CLIENT_ID = "d2bc31d11dbd41a08255a2d5a60a1dee"
MQTT_SERVER = "bemfa.com"
MQTT_PORT = 9501
# ========== MQTT主题配置 ==========
TOPIC_TEMP = "mytemp" # 温度主题
TOPIC_HUMI = "humi" # 湿度主题
TOPIC_MQ2_AOUT = "MQ2AOUT" # MQ2模拟量主题
TOPIC_MQ2_DOUT = "MQ2DOUT" # MQ2数字量主题
TOPIC_SD = "sdcard" # SD卡主题
STATUS_TOPIC = "status" # 设备状态主题
# ========== 全局变量 ==========
ONLINE_MSG = "online"
OFFLINE_MSG = "offline"
SD_CARD_MOUNTED = False
SD_CARD = None
SPI = None
# ========== 硬件初始化 ==========
def init_hardware():
global SD_CARD, SD_CARD_MOUNTED, SPI
# 初始化OLED
i2c = machine.I2C(scl=machine.Pin(I2C_SCL), sda=machine.Pin(I2C_SDA))
oled = ssd1306.SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c)
# 初始化MQ2数字引脚(上拉输入)
machine.Pin(MQ2_DOUT_PIN, machine.Pin.IN, machine.Pin.PULL_UP)
# 初始化LED引脚
machine.Pin(LED_PIN, machine.Pin.OUT).off()
# 初始化SD卡(SPI接口)
try:
SPI = machine.SPI(1,
baudrate=1000000,
sck=machine.Pin(SD_SCK),
mosi=machine.Pin(SD_MOSI),
miso=machine.Pin(SD_MISO))
SD_CARD = sdcard.SDCard(SPI, machine.Pin(SD_CS))
vfs = uos.VfsFat(SD_CARD)
uos.mount(vfs, "/sd")
SD_CARD_MOUNTED = True
print("SD card mounted successfully")
except Exception as e:
print("SD card init error:", e)
SD_CARD_MOUNTED = False
return oled
# ========== WiFi连接 ==========
def connect_wifi():
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
print("Connecting to WiFi...")
sta_if.active(True)
sta_if.connect(WIFI_SSID, WIFI_PASS)
# 等待连接,最多10秒
for _ in range(10):
if sta_if.isconnected():
break
time.sleep(1)
else:
raise RuntimeError("WiFi connection failed")
print("WiFi connected:", sta_if.ifconfig())
# ========== MQTT回调函数 ==========
def mqtt_callback(topic, msg):
topic = topic.decode()
msg = msg.decode()
print(f"Received message - Topic: {topic}, Message: {msg}")
# 处理SD卡相关命令
if topic == TOPIC_SD:
if msg == "get_status":
publish_sd_status(mqtt_client)
elif msg == "list_files":
list_sd_files()
# ========== MQTT连接和订阅 ==========
def connect_mqtt():
global mqtt_client
try:
# 创建MQTT客户端并设置遗嘱消息
mqtt_client = MQTTClient(
MQTT_CLIENT_ID,
MQTT_SERVER,
MQTT_PORT,
keepalive=60
)
mqtt_client.set_callback(mqtt_callback)
mqtt_client.set_last_will(STATUS_TOPIC, OFFLINE_MSG, retain=True)
mqtt_client.connect()
# 订阅所有主题
mqtt_client.subscribe(TOPIC_TEMP)
mqtt_client.subscribe(TOPIC_HUMI)
mqtt_client.subscribe(TOPIC_MQ2_AOUT)
mqtt_client.subscribe(TOPIC_MQ2_DOUT)
mqtt_client.subscribe(TOPIC_SD)
# 发布在线状态
mqtt_client.publish(STATUS_TOPIC, ONLINE_MSG, retain=True)
print("Connected & subscribed to MQTT")
return mqtt_client
except Exception as e:
print("MQTT connection error:", e)
raise
# ========== SD卡操作函数 ==========
def get_sd_status():
if not SD_CARD_MOUNTED:
return {"status": "unmounted", "size": 0, "free": 0}
try:
fs_stat = uos.statvfs("/sd")
total_blocks = fs_stat[0] * fs_stat[2]
free_blocks = fs_stat[0] * fs_stat[3]
block_size = fs_stat[1] / 1024 # KB
return {
"status": "mounted",
"size": f"{total_blocks * block_size / 1024:.1f}MB",
"free": f"{free_blocks * block_size / 1024:.1f}MB",
"files": len(uos.listdir("/sd"))
}
except Exception as e:
print("SD card stat error:", e)
return {"status": "error", "error": str(e)}
def list_sd_files():
if not SD_CARD_MOUNTED:
return []
try:
files = uos.listdir("/sd")
print("SD card files:", files)
return files
except Exception as e:
print("List files error:", e)
return []
# ========== 读取所有传感器数据 ==========
def read_sensors():
data = {
'temp': None,
'humi': None,
'mq2_aout': None,
'mq2_dout': None,
'sd_status': get_sd_status()
}
# 读取DHT22温湿度
try:
d = dht.DHT22(machine.Pin(DHT_PIN))
d.measure()
data['temp'] = d.temperature()
data['humi'] = d.humidity()
print(f"DHT22读取值 - 温度: {data['temp']}C, 湿度: {data['humi']}%")
except Exception as e:
print("DHT22 error:", e)
# 读取MQ2模拟量
try:
adc = machine.ADC(machine.Pin(MQ2_AOUT_PIN))
adc.atten(machine.ADC.ATTN_11DB) # 3.3V量程
data['mq2_aout'] = adc.read()
print(f"MQ2模拟值: {data['mq2_aout']}")
# 控制LED灯
if data['mq2_aout'] > 2000: # 烟雾浓度阈值
machine.Pin(LED_PIN, machine.Pin.OUT).on()
print("烟雾浓度高,LED亮")
else:
machine.Pin(LED_PIN, machine.Pin.OUT).off()
except Exception as e:
print("MQ2 AOUT error:", e)
# 读取MQ2数字量
try:
data['mq2_dout'] = machine.Pin(MQ2_DOUT_PIN).value()
print(f"MQ2数字值: {data['mq2_dout']}")
except Exception as e:
print("MQ2 DOUT error:", e)
return data
# ========== OLED显示数据 ==========
def display_data(oled, data):
oled.fill(0)
oled.text("Env+SD Monitor", 0, 0)
# 第一行显示温湿度
if data['temp'] is not None and data['humi'] is not None:
oled.text(f"T:{data['temp']}C H:{data['humi']}%", 0, 15)
# 第二行显示MQ2数据
mq2_line = ""
if data['mq2_aout'] is not None:
mq2_line += f"A:{data['mq2_aout']} "
if data['mq2_dout'] is not None:
mq2_line += "D:ALARM" if data['mq2_dout'] == 0 else "D:SAFE"
oled.text(mq2_line, 0, 30)
# 第三行显示SD卡状态
sd_status = data['sd_status']
status_line = f"SD:{sd_status['status'][:4]} {sd_status['free']}F"
oled.text(status_line, 0, 45)
oled.show()
# ========== 发布SD卡状态 ==========
def publish_sd_status(client):
try:
status = get_sd_status()
client.publish(TOPIC_SD, f"STATUS:{str(status)}")
print("SD status published")
except Exception as e:
print("Publish SD status error:", e)
# ========== 发布传感器数据 ==========
def publish_sensor_data(client, data):
# 初始化计数器(第一次调用时初始化)
if not hasattr(publish_sensor_data, 'counter'):
publish_sensor_data.counter = 0
try:
# 发布温度数据 - 直接发布数值
if data['temp'] is not None:
client.publish(TOPIC_TEMP, str(data['temp']))
print(f"已发布温度: {data['temp']}")
# 发布湿度数据 - 直接发布数值
if data['humi'] is not None:
client.publish(TOPIC_HUMI, str(data['humi']))
print(f"已发布湿度: {data['humi']}")
# 发布MQ2数据 - 分开发布模拟量和数字量
if data['mq2_aout'] is not None:
client.publish(TOPIC_MQ2_AOUT, str(data['mq2_aout']))
print(f"已发布MQ2模拟量: {data['mq2_aout']}")
if data['mq2_dout'] is not None:
client.publish(TOPIC_MQ2_DOUT, str(data['mq2_dout']))
print(f"已发布MQ2数字量: {data['mq2_dout']}")
# 定期发布SD卡状态(每5次发布一次)
if publish_sensor_data.counter % 5 == 0:
publish_sd_status(client)
publish_sensor_data.counter += 1
print("传感器数据已发布")
except Exception as e:
print("发布错误:", e)
raise
# ========== 主程序 ==========
def main():
# 初始化硬件
oled = init_hardware()
oled.text("Initializing...", 0, 0)
oled.show()
# 连接网络和MQTT
try:
connect_wifi()
mqtt = connect_mqtt()
except Exception as e:
print("Initialization failed:", e)
oled.fill(0)
oled.text("Init Failed!", 0, 0)
oled.text("Retrying...", 0, 20)
oled.show()
time.sleep(5)
machine.reset()
# 显示连接成功
oled.fill(0)
oled.text("System Ready", 0, 0)
oled.text("WiFi: " + WIFI_SSID[:10], 0, 20)
oled.text("MQTT: Connected", 0, 40)
oled.show()
time.sleep(2)
# 主循环变量
last_publish = time.time()
last_ping = time.time()
publish_interval = 2 # 发布间隔(秒)
while True:
current_time = time.time()
# 1. 保持MQTT连接
if current_time - last_ping > 30: # 每30秒发送一次PING
try:
mqtt.ping()
last_ping = current_time
print("MQTT ping sent")
except:
try:
print("Reconnecting MQTT...")
mqtt.disconnect() # 确保先断开连接
mqtt = connect_mqtt()
except:
pass
# 2. 读取传感器数据
sensor_data = read_sensors()
# 3. 更新OLED显示
display_data(oled, sensor_data)
# 4. 定期发布数据
if current_time - last_publish >= publish_interval:
try:
publish_sensor_data(mqtt, sensor_data)
last_publish = current_time
except:
try:
print("Reconnecting MQTT and retrying...")
mqtt.disconnect() # 确保先断开连接
mqtt = connect_mqtt()
publish_sensor_data(mqtt, sensor_data)
except:
pass
# 5. 处理MQTT消息
try:
mqtt.check_msg()
except:
pass
# 短暂延时
time.sleep(0.1)
# ========== 程序入口 ==========
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nProgram stopped by user")
try:
mqtt_client.publish(STATUS_TOPIC, OFFLINE_MSG, retain=True)
mqtt_client.disconnect()
except:
pass
except Exception as e:
print("Critical error:", e)
machine.reset()