from machine import Pin
import time
import network
from umqtt.simple import MQTTClient
import gc # 垃圾回收,优化内存使用
# ==================== 配置参数(需要你修改)====================
# WiFi配置
WIFI_SSID = "Wokwi-GUEST"
WIFI_PASSWORD = ""
# 巴法云MQTT配置
BAFA_CLIENT_ID = "52ec0c74346341d49688088aedcfa4cb" # 替换为你的巴法云私钥
BAFA_MQTT_SERVER = "bemfa.com" # 固定
BAFA_MQTT_PORT = 9501 # 固定
SUB_TOPIC = "led001" # 替换为你在巴法云创建的主题
# LED配置
LED_PIN = 2 # ESP32/ESP8266的GPIO2,通常有板载LED
led1 = Pin(LED_PIN, Pin.OUT)
led1.value(0) # 默认关闭LED
# ==================== 核心功能函数 ====================
# 1. 连接WiFi(带重连机制)
def connect_wifi(max_retry=10):
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
# 如果已连接,直接返回
if wlan.isconnected():
print("WiFi已连接,IP:", wlan.ifconfig()[0])
return wlan
# 尝试连接WiFi
print(f"正在连接WiFi: {WIFI_SSID}")
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
retry_count = 0
while not wlan.isconnected() and retry_count < max_retry:
time.sleep(0.5)
retry_count += 1
print(f"连接中...({retry_count}/{max_retry})")
if wlan.isconnected():
print("WiFi连接成功!IP:", wlan.ifconfig()[0])
return wlan
else:
print("WiFi连接失败!")
wlan.active(False)
return None
# 2. MQTT消息回调函数
def mqtt_callback(topic, msg):
"""
收到MQTT消息后的处理逻辑
topic: 订阅的主题(字节类型)
msg: 收到的消息(字节类型)
"""
try:
topic_str = topic.decode('utf-8')
msg_str = msg.decode('utf-8').strip() # 去除首尾空格
print(f"收到消息:主题={topic_str},内容={msg_str}")
# 控制LED逻辑
if msg_str in ["on", "1"]:
led1.value(1)
print("✅ LED已点亮")
elif msg_str in ["off", "0"]:
led1.value(0)
print("❌ LED已熄灭")
else:
print(f"⚠️ 未知指令:{msg_str},支持:on/1(开灯)、off/0(关灯)")
except Exception as e:
print(f"消息处理失败:{e}")
# 3. 连接MQTT服务器
def connect_mqtt():
# 垃圾回收,释放内存
gc.collect()
# 创建MQTT客户端
client = MQTTClient(
client_id=BAFA_CLIENT_ID,
server=BAFA_MQTT_SERVER,
port=BAFA_MQTT_PORT,
keepalive=60,
ssl=False # 巴法云MQTT不使用SSL
)
# 设置回调函数
client.set_callback(mqtt_callback)
# 尝试连接
try:
client.connect()
client.subscribe(SUB_TOPIC)
print(f"✅ MQTT连接成功,已订阅主题:{SUB_TOPIC}")
return client
except Exception as e:
print(f"❌ MQTT连接失败:{e}")
# 关闭客户端连接(如果有)
try:
client.disconnect()
except:
pass
return None
# 4. 检查并重建连接
def check_connections():
global wlan, client
# 检查WiFi连接
if not wlan or not wlan.isconnected():
wlan = connect_wifi()
if not wlan:
return False
# 检查MQTT连接
if not client:
client = connect_mqtt()
return True
# ==================== 主程序 ====================
if __name__ == "__main__":
wlan = None
client = None
# 初始化连接
print("========== 程序启动 ==========")
check_connections()
# 主循环
print("等待接收控制指令...(按Ctrl+C退出)")
try:
while True:
# 检查连接状态
if not check_connections():
print("⚠️ 连接异常,5秒后重试")
time.sleep(5)
continue
# 非阻塞检查MQTT消息
try:
client.check_msg()
# 定期发送心跳(每30秒)
if time.time() % 30 == 0:
client.ping()
except Exception as e:
print(f"⚠️ MQTT通信异常:{e}")
client = None # 标记为断开,触发重连
# 降低CPU占用
time.sleep(0.1)
# 捕获退出信号,优雅关闭
except KeyboardInterrupt:
print("\n========== 程序退出 ==========")
led1.value(0) # 关闭LED
if client:
try:
client.disconnect()
except:
pass
if wlan:
wlan.active(False)
print("资源已释放,程序正常退出")