from machine import UART, Pin
import network
import time
from umqtt.simple import MQTTClient # 导入 MQTT 客户端库
# 配置参数
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
BAFA_CLIENT_ID = "8902b6ba8c33a1146f9b223a15e704b2" # 巴法云私钥 = MQTT Client ID
BAFA_TOPIC = "led" # 巴法云主题
MQTT_BROKER = "bemfa.com"
MQTT_PORT = 9501
# 初始化硬件
uart = UART(1, baudrate=115200, tx=1, rx=3)
led = Pin(15, Pin.OUT)
led_status = "off" # 记录LED状态
# MQTT 客户端对象(全局)
mqtt_client = None
# 连接 WiFi
def connect_wifi():
uart.write(b"\rConnecting to WiFi...\n")
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
max_wait = 20
while max_wait > 0:
if wlan.isconnected():
break
max_wait -= 1
time.sleep(1)
uart.write(b".")
if wlan.isconnected():
uart.write(b"\rWiFi connected! IP: " + wlan.ifconfig()[0] + b"\n")
return True
else:
uart.write(b"\rWiFi failed!\n")
return False
# 连接 MQTT 服务器(巴法云)
def connect_mqtt():
global mqtt_client
uart.write(b"\rConnecting to MQTT Broker...\n")
# 初始化 MQTT 客户端
mqtt_client = MQTTClient(
client_id=BAFA_CLIENT_ID,
server=MQTT_BROKER,
port=MQTT_PORT,
keepalive=60 # 心跳间隔
)
try:
mqtt_client.connect()
uart.write(b"\rMQTT connected! Ready to send/receive.\n")
return True
except Exception as e:
uart.write(b"\rMQTT connect failed: " + str(e).encode() + b"\n")
return False
# MQTT 消息回调(订阅主题时会触发)
def on_mqtt_message(topic, msg):
uart.write(b"\rReceived MQTT message: " + topic + b" -> " + msg + b"\n")
# 若需要接收平台指令控制LED,可在此解析 msg(比如 msg == b"on" 则开灯)
# 控制 LED 并同步到巴法云(MQTT 发布)
def control_led(status):
global led_status
if status == "on":
led.value(1)
led_status = "on"
uart.write(b"\rLED ON (Input 1)\n")
elif status == "off":
led.value(0)
led_status = "off"
uart.write(b"\rLED OFF (Input 0)\n")
else:
uart.write(b"\rInvalid input! Use '1' or '0'.\n")
return
# 发布状态到巴法云 MQTT 主题
mqtt_client.publish(BAFA_TOPIC, led_status)
uart.write(b"\rSent to Bemfa: " + led_status.encode() + b"\n")
# 主程序逻辑
def main():
# 1. 连接 WiFi
if not connect_wifi():
return
# 2. 连接 MQTT 服务器
if not connect_mqtt():
return
# 3. 订阅主题(可选:若需要接收平台指令)
mqtt_client.set_callback(on_mqtt_message)
mqtt_client.subscribe(BAFA_TOPIC) # 订阅自身主题,也可订阅其他主题
uart.write(b"\rSubscribed to topic: " + BAFA_TOPIC + b"\n")
# 4. 主循环
uart.write(b"\rSystem ready! Enter '1'/'0' to control LED.\n")
while True:
# 检查串口输入(仿真平台输入)
if uart.any():
data = uart.read().strip() # 读取串口数据
if data == b"1":
control_led("on")
elif data == b"0":
control_led("off")
else:
uart.write(b"\rInvalid input! Try '1' (ON) or '0' (OFF).\n")
# 检查 MQTT 消息(非阻塞)
if mqtt_client != None:
mqtt_client.check_msg()
time.sleep(0.1) # 降低 CPU 占用
# 启动程序
if __name__ == "__main__":
main()