from machine import Pin, ADC, SoftI2C # 改用 SoftI2C
from time import sleep
import network
from umqtt.simple import MQTTClient
import ssd1306
# 巴法云平台配置
SERVER = "bemfa.com"
PORT = 9501
PRIVATE_KEY = "c34f996c831c59799e2b0b5ef4c0aac9"
TOPIC = "MQ2"
# WiFi 配置
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# 硬件引脚配置
I2C_SCL = Pin(18)
I2C_SDA = Pin(19)
SMOKE_PIN = 34
OLED_WIDTH = 128
OLED_HEIGHT = 64
# 初始化 SoftI2C 和 OLED
i2c = SoftI2C(scl=I2C_SCL, sda=I2C_SDA) # 软件模拟 I2C
oled = ssd1306.SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c, addr=0x3C) # 尝试 0x3C/0x3D
# 传感器配置
gas_ADC = ADC(Pin(SMOKE_PIN))
gas_ADC.atten(ADC.ATTN_11DB)
SMOKE_THRESHOLD = 2000
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print("正在连接WiFi...")
oled.fill(0)
oled.text("Connecting WiFi...", 0, 0)
oled.show()
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
while not wlan.isconnected():
pass
print(f"WiFi连接成功: {wlan.ifconfig()}")
oled.fill(0)
oled.text("WiFi Connected!", 0, 0)
oled.text(str(wlan.ifconfig()), 0, 16)
oled.show()
sleep(1)
def sub_callback(topic, msg):
print(f"收到主题 {topic.decode()} 的消息: {msg.decode()}")
def init_mqtt():
client = MQTTClient(PRIVATE_KEY, SERVER, PORT)
client.set_callback(sub_callback)
try:
client.connect()
client.subscribe(TOPIC.encode())
print(f"已连接到MQTT服务器 {SERVER}")
oled.fill(0)
oled.text("MQTT Connected!", 0, 0)
oled.text(f"Topic: {TOPIC}", 0, 16)
oled.show()
sleep(1)
return client
except Exception as e:
print(f"MQTT连接失败: {e}")
oled.fill(0)
oled.text("MQTT Failed!", 0, 0)
oled.text(str(e), 0, 16)
oled.show()
sleep(1)
return None
def update_oled_display(value, mqtt_status):
oled.fill(0)
oled.text("MQ2 Sensor", 0, 0)
oled.text(f"Value: {value}", 0, 20)
status = "WARNING" if value > SMOKE_THRESHOLD else "NORMAL"
oled.text(f"Status: {status}", 0, 40)
mqtt_text = "ONLINE" if mqtt_status else "OFFLINE"
oled.text(f"MQTT: {mqtt_text}", 0, 50)
oled.show()
def main():
oled.fill(0)
oled.text("Initializing...", 0, 0)
oled.show()
sleep(1)
connect_wifi()
mqtt_client = init_mqtt()
if mqtt_client is None:
print("MQTT连接失败,仅本地显示")
while True:
try:
gas_value = gas_ADC.read()
print(f"传感器值: {gas_value}")
update_oled_display(gas_value, mqtt_client is not None)
if mqtt_client:
try:
mqtt_client.publish(TOPIC.encode(), str(gas_value).encode())
mqtt_client.check_msg()
print("MQTT数据已发布")
except Exception as e:
print(f"MQTT发布失败: {e}")
mqtt_client = init_mqtt()
sleep(2)
except Exception as e:
print(f"程序异常: {e}")
oled.fill(0)
oled.text("ERROR", 0, 0)
oled.text(str(e)[:16], 0, 20)
oled.show()
sleep(1)
if __name__ == "__main__":
main()