from machine import Pin
import time
import network
from umqtt.simple import MQTTClient
# ===================== 【请修改这里为你的巴法云信息】 =====================
WIFI_SSID = "Wokwi-GUEST" # 仿真平台虚拟WiFi名
WIFI_PWD = "" # 仿真平台虚拟WiFi密码
MQTT_CLIENT_ID = "07b99e09b5ac40fe60a8761ce477e8ba" # 随便填英文数字,唯一即可
MQTT_SERVER = "bemfa.com" # 巴法云MQTT地址(固定)
MQTT_PORT = 9501 # 固定端口
MQTT_USER = "07b99e09b5ac40fe60a8761ce477e8ba" # 巴法云控制台右上角复制
LED_TOPIC = "nLLiKfnQ9006" # 主题名,巴法云里创建
# =========================================================================
# 8个LED引脚(和你原来一致)
led_pins = [15, 2, 0, 4, 16, 17, 5, 18]
leds = [Pin(pin, Pin.OUT) for pin in led_pins]
# 全局模式:0=手动控制 1=自动流水灯
mode = 0
# -------------------------- WiFi连接 --------------------------
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print("正在连接WiFi...")
wlan.connect(WIFI_SSID, WIFI_PWD)
while not wlan.isconnected():
time.sleep(0.5)
print("WiFi连接成功!IP:", wlan.ifconfig()[0])
# -------------------------- MQTT消息回调 --------------------------
def mqtt_callback(topic, msg):
global mode
topic = topic.decode()
msg = msg.decode().lower()
print("收到消息:", topic, msg)
# 指令1:打开所有灯
if msg == "on":
for led in leds:
led.value(1)
# 指令2:关闭所有灯
elif msg == "off":
for led in leds:
led.value(0)
# 指令3:启动自动流水灯
elif msg == "auto":
mode = 1
# 指令4:停止自动模式,回到手动
elif msg == "stop":
mode = 0
# -------------------------- MQTT连接 --------------------------
def connect_mqtt():
client = MQTTClient(
MQTT_CLIENT_ID,
MQTT_SERVER,
MQTT_PORT,
MQTT_USER,
MQTT_USER
)
client.set_callback(mqtt_callback)
client.connect()
client.subscribe(LED_TOPIC)
print("巴法云MQTT连接成功!")
return client
# -------------------------- 自动流水灯函数 --------------------------
def auto_run():
for i in range(8):
if mode != 1: # 模式被切换就立刻退出
break
# 全部熄灭
for led in leds:
led.value(0)
# 点亮当前LED
leds[i].value(1)
time.sleep(0.2)
# ===================== 主程序 =====================
if __name__ == "__main__":
try:
# 初始化
connect_wifi()
client = connect_mqtt()
# 初始全部关灯
for led in leds:
led.value(0)
print("系统启动完成,等待远程指令...")
while True:
# 检查MQTT消息
client.check_msg()
# 如果是自动模式,运行流水灯
if mode == 1:
auto_run()
time.sleep(0.05)
except KeyboardInterrupt:
# 退出全部关灯
for led in leds:
led.value(0)
print("\n程序停止")
# from machine import Pin
# import time
# # 8个LED对应的GPIO引脚(和电路图一致)
# led_pins = [15, 2,0, 4, 16, 17, 5, 18, 19]
# leds = [Pin(pin, Pin.OUT) for pin in led_pins]
# def single_led_blink():
# while True:
# # 遍历每一个灯,一次只亮一个
# for i in range(8):
# # 先把所有灯熄灭
# for led in leds:
# led.value(0)
# # 只点亮当前这一个灯
# leds[i].value(1)
# # 保持亮0.2秒(可自己改速度)
# time.sleep(0.2)
# if __name__ == "__main__":
# print("单灯依次闪烁程序启动...")
# try:
# single_led_blink()
# except KeyboardInterrupt:
# # 退出时全部熄灭
# for led in leds:
# led.value(0)
# print("\n程序停止,所有灯已熄灭")