import network
import time
from machine import UART, Pin
from umqtt.simple import MQTTClient
# 网络配置
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# 巴法云MQTT配置
MQTT_BROKER = "bemfa.com"
MQTT_PORT = 9501
MQTT_CLIENT_ID = "8854b385911048edaa12808fb1f199e6" # 客户端ID,需保持唯一
MQTT_USERNAME = "+8618840167598" # 你的巴法云用户名
MQTT_PASSWORD = "8854b385911048edaa12808fb1f199e6" # 你的巴法云私钥
MQTT_TOPIC = "myled" # 主题名称
# 硬件配置
LED_PIN = 15 # LED连接的引脚
UART_ID = 1 # UART端口ID
BAUDRATE = 115200 # 波特率
# 全局变量
mqtt_client = None
led_state = "OFF" # 初始状态
led = None
uart = None
# 连接WiFi
def connect_wifi():
print("正在连接WiFi...")
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
timeout = 0
while not wlan.isconnected() and timeout < 10:
print(".", end="")
time.sleep(1)
timeout += 1
if wlan.isconnected():
print("\nWiFi连接成功")
print(f"IP地址: {wlan.ifconfig()[0]}")
return True
else:
print("\nWiFi连接失败")
return False
# 连接MQTT服务器
def connect_mqtt():
global mqtt_client
try:
print("正在连接MQTT服务器...")
mqtt_client = MQTTClient(
client_id=MQTT_CLIENT_ID,
server=MQTT_BROKER,
port=MQTT_PORT,
user=MQTT_USERNAME,
password=MQTT_PASSWORD,
keepalive=60
)
mqtt_client.connect()
print("MQTT连接成功")
publish_status() # 发布初始状态
return True
except Exception as e:
print(f"MQTT连接失败: {e}")
return False
# 发布LED状态到MQTT
def publish_status():
global mqtt_client, led_state
try:
if mqtt_client:
mqtt_client.publish(MQTT_TOPIC, led_state, retain=True)
print(f"已发布状态: {led_state}")
except Exception as e:
print(f"发布状态失败: {e}")
# 尝试重新连接
connect_mqtt()
# 主程序
def main():
global led_state, led, uart
# 初始化硬件
uart = UART(UART_ID, BAUDRATE)
led = Pin(LED_PIN, Pin.OUT)
led.off() # 初始状态为关闭
# 连接网络
if not connect_wifi():
print("WiFi连接失败,程序退出")
return
# 连接MQTT
if not connect_mqtt():
print("MQTT连接失败,程序将继续运行但无法同步状态")
# 欢迎信息
uart.write(b"\r\n=== LED控制程序 ===\r\n")
uart.write(b"输入1: 打开LED\r\n")
uart.write(b"输入0: 关闭LED\r\n")
uart.write(b"输入s: 查询当前状态\r\n")
uart.write(b"-------------------\r\n")
# 主循环
while True:
# 处理串口输入
if uart.any():
data = uart.read().strip()
print(f"收到命令: {data}")
if data == b'1':
led.on()
led_state = "ON"
uart.write(b"LED已打开\r\n")
publish_status()
elif data == b'0':
led.off()
led_state = "OFF"
uart.write(b"LED已关闭\r\n")
publish_status()
elif data == b's':
status_msg = f"LED当前状态: {led_state}\r\n"
uart.write(status_msg.encode())
else:
uart.write(b"无效命令,请输入1、0或s\r\n")
# 检查MQTT连接状态
if mqtt_client:
try:
mqtt_client.ping() # 发送心跳包
except Exception as e:
print(f"MQTT连接丢失: {e}")
connect_mqtt() # 尝试重新连接
time.sleep(0.1) # 减少CPU占用
if __name__ == "__main__":
main()