import machine
import time
import network
from umqtt.simple import MQTTClient
import ssd1306 # OLED 驱动(Wokwi 需提前上传)
from machine import SoftI2C
# Wokwi 仿真环境配置
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
BROKER = "bemfa.com"
PORT = 9504
CLIENT_ID = "4c0da3b2d430b9f491fa15eabeb3ea21" # 替换为你的巴法云 Client ID
TOPIC_LED = "led"
LED_PIN = 22
# OLED 初始化
OLED_WIDTH = 128
OLED_HEIGHT = 64
OLED_SDA = 26
OLED_SCL = 27
i2c = SoftI2C(sda=machine.Pin(OLED_SDA), scl=machine.Pin(OLED_SCL))
oled = ssd1306.SSD1306_I2C(OLED_WIDTH, OLED_HEIGHT, i2c, addr=0x3C)
# 全局变量
led = machine.Pin(LED_PIN, machine.Pin.OUT)
current_state = "off"
client = None # MQTT 客户端
def display_info(line1, line2="", line3=""):
"""OLED 显示函数(最多3行)"""
oled.fill(0)
oled.text(line1, 0, 0)
oled.text(line2, 0, 16)
oled.text(line3, 0, 32)
oled.show()
def connect_wifi():
"""连接 Wokwi 仿真 WiFi"""
display_info("Connecting WiFi", WIFI_SSID)
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
while not wlan.isconnected():
time.sleep(0.5)
display_info("Connecting...", WIFI_SSID)
display_info("WiFi OK", f"IP: {wlan.ifconfig()[0]}")
print("WiFi 已连接")
def mqtt_callback(topic, msg):
"""MQTT 消息回调(处理网页端指令)"""
global current_state
msg_str = msg.decode()
print(f"收到指令: {msg_str}")
if msg_str == "on":
led.on()
current_state = "on"
elif msg_str == "off":
led.off()
current_state = "off"
# 更新 OLED 和 MQTT 状态
display_info("LED: " + current_state, "Topic: " + TOPIC)
client.publish(TOPIC, current_state) # 回传状态给网页
def connect_mqtt():
"""连接巴法云 MQTT 服务器"""
global client
try:
client = MQTTClient(CLIENT_ID, BROKER, PORT)
client.set_callback(mqtt_callback)
client.connect()
client.subscribe(TOPIC) # 订阅主题,接收网页指令
display_info("MQTT OK", BROKER, TOPIC)
print("MQTT 已连接")
return True
except Exception as e:
display_info("MQTT Err", str(e)[:16])
print("MQTT 连接失败:", e)
return False
def main():
connect_wifi()
if not connect_mqtt():
return
# 初始状态
led.off()
display_info("LED: off", "Topic: " + TOPIC)
client.publish(TOPIC, "off") # 上报初始状态
# 主循环(保持 MQTT 在线)
while True:
try:
client.check_msg() # 检查 MQTT 消息
time.sleep(0.1)
except Exception as e:
display_info("MQTT Err", str(e)[:16])
print("MQTT 异常:", e)
time.sleep(1)
connect_mqtt() # 尝试重连
if __name__ == "__main__":
main()