from machine import UART, Pin, reset
import time
import network
from umqtt.simple import MQTTClient
# ====================== 巴法云配置 ======================
SECRET_KEY = "369ba69d230e49bc9ad9825f0f4b1894" # 你的巴法云秘钥
MQTT_SERVER = "mqtt.bemfa.com" # 巴法云 MQTT 服务器
MQTT_PORT = 9501 # 端口号
TOPIC_LED = "led001" # LED控制主题名
# ====================== WiFi 配置 ======================
WIFI_SSID = "Wokwi-GUEST" # WiFi 名称
WIFI_PASSWORD = "" # WiFi 密码
# ====================== 硬件初始化 ======================
uart = UART(1, baudrate=115200, tx=1, rx=3) # 初始化 UART
led = Pin(15, Pin.OUT, value=0) # 初始化 LED
# ====================== 全局变量 ======================
mqtt_client = None # MQTT 客户端对象
wifi_connected = False # WiFi 连接状态
mqtt_connected = False # MQTT 连接状态
reconnect_interval = 5 # 重连间隔(秒)
last_attempt_time = 0 # 上次连接尝试时间
# ====================== WiFi 连接函数 ======================
def connect_wifi():
global wifi_connected
wlan = network.WLAN(network.STA_IF)
# 检查是否已连接
if wlan.isconnected():
wifi_connected = True
return True
wlan.active(True)
wlan.disconnect()
print(f"正在连接 WiFi:{WIFI_SSID}...")
uart.write(f"\rConnecting to WiFi: {WIFI_SSID}...\n")
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
# 等待连接成功或超时(30秒)
timeout = 30
while timeout > 0 and not wlan.isconnected():
time.sleep(1)
timeout -= 1
print(f"剩余连接时间:{timeout} 秒")
if wlan.isconnected():
print(f"WiFi 连接成功!IP:{wlan.ifconfig()[0]}")
uart.write("\rWiFi connected successfully!\n")
wifi_connected = True
return True
else:
print("WiFi 连接失败!")
uart.write("\rWiFi connection failed!\n")
wifi_connected = False
return False
# ====================== MQTT 连接函数 ======================
def connect_mqtt():
global mqtt_client, mqtt_connected
if not wifi_connected:
print("WiFi未连接,无法连接MQTT!")
uart.write("\rWiFi not connected, cannot connect to MQTT!\n")
return False
try:
# 客户端 ID 固定为秘钥
mqtt_client = MQTTClient(SECRET_KEY, MQTT_SERVER, port=MQTT_PORT)
mqtt_client.set_callback(mqtt_callback)
mqtt_client.connect()
mqtt_client.subscribe(TOPIC_LED)
print("MQTT 连接到巴法云成功!")
uart.write("\rMQTT connected to BafaCloud successfully!\n")
mqtt_connected = True
return True
except Exception as e:
print(f"MQTT 连接失败:{e}")
uart.write(f"\rMQTT connection failed: {e}\n")
mqtt_connected = False
return False
# ====================== MQTT 回调函数 ======================
def mqtt_callback(topic, msg):
print(f"收到MQTT消息 - 主题: {topic.decode()}, 消息: {msg.decode()}")
uart.write(f"\rMQTT message received: {msg.decode()}\n")
if msg.decode() == 'on':
led.value(1)
uart.write("\rLED is ON.\n")
update_led_status() # 同步状态
elif msg.decode() == 'off':
led.value(0)
uart.write("\rLED is OFF.\n")
update_led_status() # 同步状态
else:
uart.write("\rInvalid MQTT command!\n")
# ====================== 更新LED状态到巴法云 ======================
def update_led_status():
if not mqtt_connected or not mqtt_client:
return
try:
status = "on" if led.value() else "off"
mqtt_client.publish(TOPIC_LED, status)
print(f"已更新LED状态到巴法云: {status}")
except Exception as e:
print(f"发布LED状态失败: {e}")
check_connection() # 检查连接状态
# ====================== 检查连接状态并尝试重连 ======================
def check_connection():
global last_attempt_time
current_time = time.time()
# 避免频繁尝试重连
if current_time - last_attempt_time < reconnect_interval:
return
last_attempt_time = current_time
# 检查WiFi连接
wlan = network.WLAN(network.STA_IF)
if not wlan.isconnected():
print("WiFi连接丢失,尝试重连...")
connect_wifi()
# 检查MQTT连接
if wifi_connected and (not mqtt_connected or not mqtt_client):
print("MQTT连接丢失,尝试重连...")
connect_mqtt()
# ====================== 处理串口输入 ======================
def handle_uart_input():
if uart.any():
data = uart.read().decode().strip()
print(f"收到串口输入: {data}")
if data == '1':
led.value(1)
uart.write("\rUART: LED is ON.\n")
update_led_status() # 更新状态到巴法云
elif data == '0':
led.value(0)
uart.write("\rUART: LED is OFF.\n")
update_led_status() # 更新状态到巴法云
elif data.lower() == 'status':
status = "ON" if led.value() else "OFF"
uart.write(f"\rLED Status: {status}\n")
else:
uart.write("\r无效输入! 请输入1/0控制LED,或输入status查看状态.\n")
# ====================== 主程序 ======================
if __name__ == "__main__":
uart.write("\rLED Control System Started.\n")
uart.write("Enter 1 to turn ON LED, 0 to turn OFF, status to check status.\n")
try:
# 初始连接
connect_wifi()
if wifi_connected:
connect_mqtt()
while True:
try:
# 处理串口输入
handle_uart_input()
# 检查MQTT消息
if mqtt_connected and mqtt_client:
mqtt_client.check_msg()
# 定期检查连接状态
check_connection()
# 短暂延时,避免CPU占用过高
time.sleep(0.1)
except Exception as e:
print(f"主循环异常: {e}")
time.sleep(1)
except KeyboardInterrupt:
print("\n程序停止,断开连接...")
uart.write("\rProgram stopped.\n")
if mqtt_client:
try:
mqtt_client.disconnect()
except:
pass
led.value(0) # 确保LED关闭